diff --git a/aws/logs_monitoring_go/cmd/forwarder/main.go b/aws/logs_monitoring_go/cmd/forwarder/main.go index d31ad219a..46ba69d2e 100644 --- a/aws/logs_monitoring_go/cmd/forwarder/main.go +++ b/aws/logs_monitoring_go/cmd/forwarder/main.go @@ -8,11 +8,9 @@ package main import ( "context" "encoding/json" - "errors" + "fmt" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/forwarding" - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/handling" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/pipeline" @@ -33,24 +31,15 @@ func main() { panic(err) } - cwHandler := handling.NewCloudwatch(cfg) - kinesisHandler := handling.NewKinesis(cfg) - s3Handler := handling.NewS3(cfg) - handling.Register(parsing.InvocationSourceCloudwatchLogs, cwHandler) - handling.Register(parsing.InvocationSourceKinesis, kinesisHandler) - handling.Register(parsing.InvocationSourceS3, s3Handler) - lambda.Start(handleRequest(cfg)) } func handleRequest(cfg *config.Config) func(ctx context.Context, event json.RawMessage) error { return func(ctx context.Context, event json.RawMessage) error { - invocation := parsing.DetectInvocationSource(event) - if invocation == parsing.InvocationSourceUnknown { - return errors.New("unknown invocation") + parsed, err := parsing.Parse(event) + if err != nil { + return fmt.Errorf("parse: %w", err) } - - run := pipeline.NewRun(cfg, handling.Handlers[invocation], forwarding.Storage(invocation)) - return pipeline.Start(ctx, event, run) + return pipeline.Start(ctx, parsed, cfg) } } diff --git a/aws/logs_monitoring_go/internal/forwarding/storage.go b/aws/logs_monitoring_go/internal/forwarding/storage.go index 292ac75e9..53b1bd03b 100644 --- a/aws/logs_monitoring_go/internal/forwarding/storage.go +++ b/aws/logs_monitoring_go/internal/forwarding/storage.go @@ -12,11 +12,11 @@ const ( s3Storage = "s3" ) -func Storage(source parsing.InvocationSource) string { - switch source { - case parsing.InvocationSourceS3: +func StorageFromContentType(contentType parsing.ContentType) string { + switch contentType { + case parsing.ContentTypeS3: return s3Storage - case parsing.InvocationSourceCloudwatchLogs, parsing.InvocationSourceKinesis: + case parsing.ContentTypeCloudwatchLogs, parsing.ContentTypeKinesis: return cloudwatchStorage default: return "" diff --git a/aws/logs_monitoring_go/internal/handling/cloudtrail.go b/aws/logs_monitoring_go/internal/handling/cloudtrail.go new file mode 100644 index 000000000..9e328eb24 --- /dev/null +++ b/aws/logs_monitoring_go/internal/handling/cloudtrail.go @@ -0,0 +1,118 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package handling + +import ( + "compress/gzip" + "encoding/json" + "errors" + "fmt" + "io" + "iter" + "log/slog" + "regexp" + "strings" +) + +const ( + cloudTrailARNKey = "arn" + cloudTrailUserIdentityKey = "userIdentity" +) + +var ( + cloudTrailRegex = regexp.MustCompile(`\d+_CloudTrail(|-Digest|-Insight)_\w{2}(|-gov|-cn)-\w{4,9}-\d_(|.+)\d{8}T\d{4,6}Z(|.+)\.json\.gz$`) + ec2InstanceRegexp = regexp.MustCompile(`^arn:aws:sts::.*?:assumed-role/(?P.*?)/(?Pi-([0-9a-f]{8}|[0-9a-f]{17}))$`) +) + +func decodeCloudTrail(r io.Reader) iter.Seq2[string, error] { + return func(yield func(string, error) bool) { + gz, err := gzip.NewReader(r) + if err != nil { + yield("", fmt.Errorf("decode cloudtrail gzip: %w", err)) + return + } + defer gz.Close() //nolint:errcheck + + dec := json.NewDecoder(gz) + + if t, err := dec.Token(); err != nil || t != json.Delim('{') { + yield("", errors.New("decode cloudtrail: expected '{' at start of JSON")) + return + } + if t, err := dec.Token(); err != nil || t != "Records" { + yield("", errors.New("decode cloudtrail: expected 'Records' key")) + return + } + if t, err := dec.Token(); err != nil || t != json.Delim('[') { + yield("", errors.New("decode cloudtrail: expected '[' at start of Records array")) + return + } + + for dec.More() { + var raw json.RawMessage + if err := dec.Decode(&raw); err != nil { + yield("", fmt.Errorf("decode cloudtrail record: %w", err)) + return + } + if !yield(string(raw), nil) { + return + } + } + } +} + +func cloudtrailHost(message string) string { + dec := json.NewDecoder(strings.NewReader(message)) + if t, err := dec.Token(); err != nil || t != json.Delim('{') { + return "" + } + + for dec.More() { + key, err := dec.Token() + if err != nil { + return "" + } + if key != cloudTrailUserIdentityKey { + var skip json.RawMessage + if err := dec.Decode(&skip); err != nil { + return "" + } + continue + } + + if t, err := dec.Token(); err != nil || t != json.Delim('{') { + return "" + } + + for dec.More() { + innerKey, err := dec.Token() + if err != nil { + return "" + } + if innerKey != cloudTrailARNKey { + var skip json.RawMessage + if err := dec.Decode(&skip); err != nil { + return "" + } + continue + } + + var arn string + if err := dec.Decode(&arn); err != nil { + return "" + } + + matches := ec2InstanceRegexp.FindStringSubmatch(arn) + if matches == nil { + slog.Debug(arn + " arn did not match an EC2 host instance, cloudtrail host extraction skipped") + return "" + } + return matches[ec2InstanceRegexp.SubexpIndex("host")] + } + return "" + } + return "" +} diff --git a/aws/logs_monitoring_go/internal/handling/cloudtrail_test.go b/aws/logs_monitoring_go/internal/handling/cloudtrail_test.go new file mode 100644 index 000000000..b9ea593cd --- /dev/null +++ b/aws/logs_monitoring_go/internal/handling/cloudtrail_test.go @@ -0,0 +1,189 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package handling + +import ( + "bytes" + "testing" + + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/testutil" + "github.com/google/go-cmp/cmp" +) + +func TestCloudTrailRegex(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + key string + want bool + }{ + "standard cloudtrail": { + key: "601427279990_CloudTrail_us-east-1_20210503T0000Z_QrttGEk4ZcBTLwj5.json.gz", + want: true, + }, + "cloudtrail digest": { + key: "601427279990_CloudTrail-Digest_us-east-1_20210503T0000Z_digest.json.gz", + want: true, + }, + "cloudtrail insight": { + key: "601427279990_CloudTrail-Insight_us-east-1_20210503T0000Z_insight.json.gz", + want: true, + }, + "gov region": { + key: "601427279990_CloudTrail_us-gov-west-1_20210503T0000Z_abc.json.gz", + want: true, + }, + "cn region": { + key: "601427279990_CloudTrail_cn-north-1_20210503T0000Z_abc.json.gz", + want: true, + }, + "not cloudtrail": { + key: "some-random-log-file.json.gz", + want: false, + }, + "waf log": { + key: "aws-waf-logs-something.json.gz", + want: false, + }, + "plain text file": { + key: "access.log", + want: false, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + t.Parallel() + got := cloudTrailRegex.MatchString(tc.key) + if got != tc.want { + t.Errorf("cloudTrailRegex().MatchString(%q) = %v, want %v", tc.key, got, tc.want) + } + }) + } +} + +func TestCloudtrailHost(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + message string + want string + }{ + "ec2 instance (17)": { + message: `{"userIdentity":{"arn":"arn:aws:sts::601427279990:assumed-role/MyRole/i-08014e4f62ccf762d"}}`, + want: "i-08014e4f62ccf762d", + }, + "ec2 instance (8)": { + message: `{"userIdentity":{"arn":"arn:aws:sts::601427279990:assumed-role/MyRole/i-abcd1234"}}`, + want: "i-abcd1234", + }, + "non ec2 arn": { + message: `{"userIdentity":{"arn":"arn:aws:sts::601427279990:assumed-role/MyRole/my-session"}}`, + want: "", + }, + "missing userIdentity": { + message: `{"eventName":"DescribeTable"}`, + want: "", + }, + "missing arn": { + message: `{"userIdentity":{"type":"AssumedRole"}}`, + want: "", + }, + "invalid json": { + message: "not json", + want: "", + }, + "empty message": { + message: "", + want: "", + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + t.Parallel() + if got := cloudtrailHost(tc.message); got != tc.want { + t.Errorf("want %q, got %q", tc.want, got) + } + }) + } +} + +func TestDecodeCloudTrail(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + input []byte + want []string + wantErr bool + }{ + "single record": { + input: testutil.MustGzipJSON(t, map[string]any{ + "Records": []any{ + map[string]any{ + "eventName": "DescribeTable", + "userIdentity": map[string]any{ + "arn": "arn:aws:sts::601427279990:assumed-role/MyRole/i-08014e4f62ccf762d", + }, + }, + }, + }), + want: []string{ + `{"eventName":"DescribeTable","userIdentity":{"arn":"arn:aws:sts::601427279990:assumed-role/MyRole/i-08014e4f62ccf762d"}}`, + }, + }, + "multiple records": { + input: testutil.MustGzipJSON(t, map[string]any{ + "Records": []any{ + map[string]any{"eventName": "event1"}, + map[string]any{"eventName": "event2"}, + }, + }), + want: []string{ + `{"eventName":"event1"}`, + `{"eventName":"event2"}`, + }, + }, + "empty records array": { + input: testutil.MustGzipJSON(t, map[string]any{ + "Records": []any{}, + }), + want: nil, + }, + "invalid gzip": { + input: []byte("not gzip"), + wantErr: true, + }, + "invalid json": { + input: testutil.MustGzipJSON(t, "not an object"), + wantErr: true, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + t.Parallel() + + var got []string + for msg, err := range decodeCloudTrail(bytes.NewReader(tc.input)) { + if err != nil { + if !tc.wantErr { + t.Fatalf("unexpected error: %v", err) + } + return + } + got = append(got, msg) + } + + if tc.wantErr { + t.Fatal("expected error, got none") + } + if diff := cmp.Diff(tc.want, got); diff != "" { + t.Errorf("mismatch (-want +got):\n%s", diff) + } + }) + } +} diff --git a/aws/logs_monitoring_go/internal/handling/cloudwatch.go b/aws/logs_monitoring_go/internal/handling/cloudwatch.go index 913eabe55..a8cc4a751 100644 --- a/aws/logs_monitoring_go/internal/handling/cloudwatch.go +++ b/aws/logs_monitoring_go/internal/handling/cloudwatch.go @@ -42,7 +42,7 @@ func NewCloudwatch(cfg *config.Config) *CloudwatchHandler { } } -func (h CloudwatchHandler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error { +func (h *CloudwatchHandler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error { var cwEvent events.CloudwatchLogsEvent if err := json.Unmarshal(event, &cwEvent); err != nil { return fmt.Errorf("unmarshal: %w", err) @@ -134,6 +134,11 @@ func (h CloudwatchHandler) newCloudwatchLogEntry(event events.CloudwatchLogsLogE entry.Message = message entry.ID = event.ID entry.Timestamp = event.Timestamp + + if entry.Source == sourceCloudtrail { + entry.Host = cloudtrailHost(event.Message) + } + return entry } diff --git a/aws/logs_monitoring_go/internal/handling/cloudwatch_test.go b/aws/logs_monitoring_go/internal/handling/cloudwatch_test.go index bb6c4c4ab..6caf31faf 100644 --- a/aws/logs_monitoring_go/internal/handling/cloudwatch_test.go +++ b/aws/logs_monitoring_go/internal/handling/cloudwatch_test.go @@ -118,6 +118,80 @@ func TestCloudwatchHandler_Handle(t *testing.T) { }, }, }, + "cloudtrail with ec2 host": { + event: testutil.MustCloudwatchEvent(t, testutil.MustGzipJSON(t, map[string]any{ + "messageType": "DATA_MESSAGE", + "owner": "601427279990", + "logGroup": "cloudtrail-logs", + "logStream": "601427279990_CloudTrail_us-east-1", + "logEvents": []map[string]any{ + { + "id": "ct1", + "timestamp": 1620000000000, + "message": `{"eventName":"DescribeTable","userIdentity":{"arn":"arn:aws:sts::601427279990:assumed-role/MyRole/i-08014e4f62ccf762d"}}`, + }, + }, + })), + config: testutil.EmptyConfig(), + chanSize: 1, + want: []model.LogEntry{ + { + Message: `{"eventName":"DescribeTable","userIdentity":{"arn":"arn:aws:sts::601427279990:assumed-role/MyRole/i-08014e4f62ccf762d"}}`, + Source: "cloudtrail", + SourceCategory: "aws", + Service: "cloudtrail", + Tags: model.Tags{"service:cloudtrail"}, + Host: "i-08014e4f62ccf762d", + ID: "ct1", + Timestamp: 1620000000000, + Metadata: model.CloudwatchMetadata{ + LambdaOrigin: model.LambdaOrigin{ARN: testutil.ARN}, + Origin: model.CloudwatchOrigin{ + LogGroup: "cloudtrail-logs", + LogStream: "601427279990_CloudTrail_us-east-1", + Owner: "601427279990", + }, + }, + }, + }, + }, + "cloudtrail without ec2 host": { + event: testutil.MustCloudwatchEvent(t, testutil.MustGzipJSON(t, map[string]any{ + "messageType": "DATA_MESSAGE", + "owner": "601427279990", + "logGroup": "cloudtrail-logs", + "logStream": "601427279990_CloudTrail_us-east-1", + "logEvents": []map[string]any{ + { + "id": "ct2", + "timestamp": 1620000000000, + "message": `{"eventName":"DescribeTable","userIdentity":{"arn":"arn:aws:iam::601427279990:user/admin"}}`, + }, + }, + })), + config: testutil.EmptyConfig(), + chanSize: 1, + want: []model.LogEntry{ + { + Message: `{"eventName":"DescribeTable","userIdentity":{"arn":"arn:aws:iam::601427279990:user/admin"}}`, + Source: "cloudtrail", + SourceCategory: "aws", + Service: "cloudtrail", + Tags: model.Tags{"service:cloudtrail"}, + Host: "", + ID: "ct2", + Timestamp: 1620000000000, + Metadata: model.CloudwatchMetadata{ + LambdaOrigin: model.LambdaOrigin{ARN: testutil.ARN}, + Origin: model.CloudwatchOrigin{ + LogGroup: "cloudtrail-logs", + LogStream: "601427279990_CloudTrail_us-east-1", + Owner: "601427279990", + }, + }, + }, + }, + }, "config overrides source, host, service and tags": { event: testutil.MustCloudwatchEvent(t, testutil.MustGzipJSON(t, map[string]any{ "messageType": "DATA_MESSAGE", diff --git a/aws/logs_monitoring_go/internal/handling/eventbridge.go b/aws/logs_monitoring_go/internal/handling/eventbridge.go new file mode 100644 index 000000000..06ab7619e --- /dev/null +++ b/aws/logs_monitoring_go/internal/handling/eventbridge.go @@ -0,0 +1,94 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package handling + +import ( + "bytes" + "cmp" + "context" + "encoding/json" + "errors" + "fmt" + "strings" + + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" +) + +type EventBridgeHandler struct { + cfg *config.Config +} + +func NewEventBridge(cfg *config.Config) *EventBridgeHandler { + return &EventBridgeHandler{ + cfg: cfg, + } +} + +func (h *EventBridgeHandler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error { + lambdaOrigin, err := model.GetLambdaOrigin(ctx) + if err != nil { + return fmt.Errorf("get lambda origin: %w", err) + } + + ebSource, err := decodeEventBridgeSource(event) + if err != nil { + return err + } + source := cmp.Or(h.cfg.Source, ebSource) + service := cmp.Or(h.cfg.Service, source) + + entry := model.NewLogEntry() + entry.Message = string(event) + entry.Source = source + entry.Service = service + entry.Tags = h.cfg.Tags + entry.Metadata = lambdaOrigin + + if h.cfg.Filter.ShouldExclude(entry.Message) { + return nil + } + + entry.Message = h.cfg.Scrubber.Scrub(entry.Message) + return concurrent.SafeSender(ctx, out, entry) +} + +func decodeEventBridgeSource(event json.RawMessage) (string, error) { + dec := json.NewDecoder(bytes.NewReader(event)) + + if t, err := dec.Token(); err != nil || t != json.Delim('{') { + return "", errors.New("decode eventbridge source: expected '{'") + } + + for dec.More() { + key, err := dec.Token() + if err != nil { + return "", fmt.Errorf("decode eventbridge source: read key: %w", err) + } + if key == "source" { + var source string + if err := dec.Decode(&source); err != nil { + return "", fmt.Errorf("decode eventbridge source: %w", err) + } + return eventBridgeSource(source), nil + } + var skip json.RawMessage + if err := dec.Decode(&skip); err != nil { + return "", fmt.Errorf("decode eventbridge source: skip field: %w", err) + } + } + + return "", nil +} + +func eventBridgeSource(source string) string { + _, after, found := strings.Cut(source, ".") + if found { + return after + } + return sourceCloudwatch +} diff --git a/aws/logs_monitoring_go/internal/handling/eventbridge_test.go b/aws/logs_monitoring_go/internal/handling/eventbridge_test.go new file mode 100644 index 000000000..422e94e13 --- /dev/null +++ b/aws/logs_monitoring_go/internal/handling/eventbridge_test.go @@ -0,0 +1,132 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package handling + +import ( + "encoding/json" + "testing" + + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/testutil" + "github.com/google/go-cmp/cmp" +) + +func TestEventBridgeHandler_Handle(t *testing.T) { + t.Parallel() + + ctx := testutil.LambdaContext(t) + + tests := map[string]struct { + event json.RawMessage + cfg *config.Config + want []model.LogEntry + wantErr bool + }{ + "scheduled event": { + event: json.RawMessage(`{"version":"0","id":"abc","detail-type":"Scheduled Event","source":"aws.events","account":"123456789012","time":"1970-01-01T00:00:00Z","region":"us-east-1","resources":[],"detail":{}}`), + cfg: testutil.EmptyConfig(), + want: []model.LogEntry{ + { + Message: `{"version":"0","id":"abc","detail-type":"Scheduled Event","source":"aws.events","account":"123456789012","time":"1970-01-01T00:00:00Z","region":"us-east-1","resources":[],"detail":{}}`, + Source: "events", + SourceCategory: "aws", + Service: "events", + Metadata: testutil.LambdaOrigin(), + }, + }, + }, + "ec2 event": { + event: json.RawMessage(`{"version":"0","id":"abc","detail-type":"EC2 Instance State-change Notification","source":"aws.ec2","account":"123456789012","time":"1970-01-01T00:00:00Z","region":"us-east-1","resources":[],"detail":{"instance-id":"i-123","state":"running"}}`), + cfg: testutil.EmptyConfig(), + want: []model.LogEntry{ + { + Message: `{"version":"0","id":"abc","detail-type":"EC2 Instance State-change Notification","source":"aws.ec2","account":"123456789012","time":"1970-01-01T00:00:00Z","region":"us-east-1","resources":[],"detail":{"instance-id":"i-123","state":"running"}}`, + Source: "ec2", + SourceCategory: "aws", + Service: "ec2", + Metadata: testutil.LambdaOrigin(), + }, + }, + }, + "custom source override": { + event: json.RawMessage(`{"version":"0","id":"abc","detail-type":"Scheduled Event","source":"aws.events","account":"123456789012","time":"1970-01-01T00:00:00Z","region":"us-east-1","resources":[],"detail":{}}`), + cfg: &config.Config{Source: "custom-source"}, + want: []model.LogEntry{ + { + Message: `{"version":"0","id":"abc","detail-type":"Scheduled Event","source":"aws.events","account":"123456789012","time":"1970-01-01T00:00:00Z","region":"us-east-1","resources":[],"detail":{}}`, + Source: "custom-source", + SourceCategory: "aws", + Service: "custom-source", + Metadata: testutil.LambdaOrigin(), + }, + }, + }, + "invalid JSON": { + event: json.RawMessage(`not json`), + cfg: testutil.EmptyConfig(), + wantErr: true, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + t.Parallel() + + handler := NewEventBridge(tc.cfg) + out := make(chan model.LogEntry, len(tc.want)) + + err := handler.Handle(ctx, tc.event, out) + close(out) + + if tc.wantErr { + if err == nil { + t.Fatal("expected error, got nil") + } + return + } + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + var got []model.LogEntry + for entry := range out { + got = append(got, entry) + } + + if diff := cmp.Diff(tc.want, got); diff != "" { + t.Errorf("entries mismatch (-want +got):\n%s", diff) + } + }) + } +} + +func TestEventBridgeSource(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + source string + want string + }{ + "aws.events": {source: "aws.events", want: "events"}, + "aws.ec2": {source: "aws.ec2", want: "ec2"}, + "aws.s3": {source: "aws.s3", want: "s3"}, + "custom.app": {source: "custom.app", want: "app"}, + "no dot": {source: "nodot", want: "cloudwatch"}, + "empty string": {source: "", want: "cloudwatch"}, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + t.Parallel() + got := eventBridgeSource(tc.source) + if got != tc.want { + t.Errorf("got %q, want %q", got, tc.want) + } + }) + } +} diff --git a/aws/logs_monitoring_go/internal/handling/handler.go b/aws/logs_monitoring_go/internal/handling/handler.go index 6edab85a3..49d8b204d 100644 --- a/aws/logs_monitoring_go/internal/handling/handler.go +++ b/aws/logs_monitoring_go/internal/handling/handler.go @@ -8,17 +8,28 @@ package handling import ( "context" "encoding/json" + "fmt" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing" ) -var Handlers = make(map[parsing.InvocationSource]Handler) - type Handler interface { Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error } -func Register(invocation parsing.InvocationSource, handler Handler) { - Handlers[invocation] = handler +func NewHandler(ct parsing.ContentType, cfg *config.Config) (Handler, error) { + switch ct { + case parsing.ContentTypeCloudwatchLogs: + return NewCloudwatch(cfg), nil + case parsing.ContentTypeS3: + return NewS3(cfg), nil + case parsing.ContentTypeKinesis: + return NewKinesis(cfg), nil + case parsing.ContentTypeEventBridge: + return NewEventBridge(cfg), nil + default: + return nil, fmt.Errorf("unsupported content type: %v", ct) + } } diff --git a/aws/logs_monitoring_go/internal/handling/kinesis.go b/aws/logs_monitoring_go/internal/handling/kinesis.go index a1caf3426..4ce403f58 100644 --- a/aws/logs_monitoring_go/internal/handling/kinesis.go +++ b/aws/logs_monitoring_go/internal/handling/kinesis.go @@ -26,13 +26,13 @@ func NewKinesis(cfg *config.Config) *KinesisHandler { } } -func (h KinesisHandler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error { +func (h *KinesisHandler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error { var kinesisEvent events.KinesisEvent if err := json.Unmarshal(event, &kinesisEvent); err != nil { return fmt.Errorf("unmarshal: %w", err) } - cw := CloudwatchHandler(h) + cw := CloudwatchHandler(*h) for i, record := range kinesisEvent.Records { cwData, err := decompressCloudwatchLogs(record.Kinesis.Data) if err != nil { diff --git a/aws/logs_monitoring_go/internal/handling/s3.go b/aws/logs_monitoring_go/internal/handling/s3.go index 955d52774..4a3c35e0e 100644 --- a/aws/logs_monitoring_go/internal/handling/s3.go +++ b/aws/logs_monitoring_go/internal/handling/s3.go @@ -10,6 +10,7 @@ import ( "context" "encoding/json" "fmt" + "iter" "log/slog" "slices" "strings" @@ -37,7 +38,7 @@ func NewS3(cfg *config.Config) *S3Handler { } } -func (h S3Handler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error { +func (h *S3Handler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error { var s3Event events.S3Event if err := json.Unmarshal(event, &s3Event); err != nil { return fmt.Errorf("unmarshal: %w", err) @@ -53,17 +54,17 @@ func (h S3Handler) Handle(ctx context.Context, event json.RawMessage, out chan<- return err } - for _, record := range s3Event.Records { - if err := h.processRecord(ctx, client, out, record, lambdaOrigin); err != nil { + for _, eventRecord := range s3Event.Records { + if err := h.processRecord(ctx, client, out, eventRecord, lambdaOrigin); err != nil { return err } } return nil } -func (h S3Handler) processRecord(ctx context.Context, client S3APIClient, out chan<- model.LogEntry, record events.S3EventRecord, lambdaOrigin model.LambdaOrigin) error { - bucket := record.S3.Bucket.Name - key := record.S3.Object.URLDecodedKey +func (h S3Handler) processRecord(ctx context.Context, client S3APIClient, out chan<- model.LogEntry, eventRecord events.S3EventRecord, lambdaOrigin model.LambdaOrigin) error { + bucket := eventRecord.S3.Bucket.Name + key := eventRecord.S3.Object.URLDecodedKey body, err := getS3Object(ctx, client, bucket, key) if err != nil { @@ -75,32 +76,40 @@ func (h S3Handler) processRecord(ctx context.Context, client S3APIClient, out ch } }() - scanner := NewScanner(body, h.cfg.S3MultilineLogRegex) - for scanner.Scan() { - message := strings.ToValidUTF8(scanner.Text(), "") - entry := h.newS3LogEntry(record, message, lambdaOrigin) + var records iter.Seq2[string, error] + isCloudTrail := cloudTrailRegex.MatchString(key) + if isCloudTrail { + records = decodeCloudTrail(body) + } else { + records = scan(body, h.cfg.S3MultilineLogRegex) + } + + for message, err := range records { + if err != nil { + return err + } + entry := h.newS3LogEntry(eventRecord, message, lambdaOrigin) if h.cfg.Filter.ShouldExclude(entry.Message) { continue } + if isCloudTrail { + entry.Host = cloudtrailHost(message) + } entry.Message = h.cfg.Scrubber.Scrub(entry.Message) if err := concurrent.SafeSender(ctx, out, entry); err != nil { return err } } - - if err := scanner.Err(); err != nil { - return err - } return nil } -func (h S3Handler) newS3LogEntry(record events.S3EventRecord, message string, lambdaOrigin model.LambdaOrigin) model.LogEntry { - key := record.S3.Object.URLDecodedKey +func (h S3Handler) newS3LogEntry(eventRecord events.S3EventRecord, message string, lambdaOrigin model.LambdaOrigin) model.LogEntry { + key := eventRecord.S3.Object.URLDecodedKey metadata := model.S3Metadata{ LambdaOrigin: lambdaOrigin, Origin: model.S3Origin{ - Bucket: record.S3.Bucket.Name, + Bucket: eventRecord.S3.Bucket.Name, Key: key, }, } diff --git a/aws/logs_monitoring_go/internal/handling/s3_test.go b/aws/logs_monitoring_go/internal/handling/s3_test.go index cca3f9158..9034380c1 100644 --- a/aws/logs_monitoring_go/internal/handling/s3_test.go +++ b/aws/logs_monitoring_go/internal/handling/s3_test.go @@ -6,6 +6,7 @@ package handling import ( + "bytes" "errors" "io" "regexp" @@ -21,22 +22,32 @@ import ( "go.uber.org/mock/gomock" ) -var testS3Record = events.S3EventRecord{ - S3: events.S3Entity{ - Bucket: events.S3Bucket{Name: "b"}, - Object: events.S3Object{URLDecodedKey: "k"}, - }, -} +var ( + testS3EventRecord = events.S3EventRecord{ + S3: events.S3Entity{ + Bucket: events.S3Bucket{Name: "b"}, + Object: events.S3Object{URLDecodedKey: "k"}, + }, + } + + testCloudTrailKey = "601427279990_CloudTrail_us-east-1_20210503T0000Z_QrttGEk4ZcBTLwj5.json.gz" + testCloudTrailEventRecord = events.S3EventRecord{ + S3: events.S3Entity{ + Bucket: events.S3Bucket{Name: "trail-bucket"}, + Object: events.S3Object{URLDecodedKey: testCloudTrailKey}, + }, + } +) func TestProcessS3Record(t *testing.T) { t.Parallel() tests := map[string]struct { - mockSetup func(m *MockS3APIClient) - cfg *config.Config - chanSize int - want []model.LogEntry - wantErr bool + mockSetup func(m *MockS3APIClient) + cfg *config.Config + eventRecord events.S3EventRecord + want []model.LogEntry + wantErr bool }{ "single line": { mockSetup: func(m *MockS3APIClient) { @@ -56,8 +67,8 @@ func TestProcessS3Record(t *testing.T) { Body: io.NopCloser(strings.NewReader("line1\nline2\nline3")), }, nil) }, - cfg: testutil.EmptyConfig(), - chanSize: 3, + cfg: testutil.EmptyConfig(), + eventRecord: testS3EventRecord, want: []model.LogEntry{ wantS3Entry("line1", "s3", "s3", nil), wantS3Entry("line2", "s3", "s3", nil), @@ -71,16 +82,18 @@ func TestProcessS3Record(t *testing.T) { Body: io.NopCloser(strings.NewReader("")), }, nil) }, - cfg: testutil.EmptyConfig(), - want: nil, + cfg: testutil.EmptyConfig(), + eventRecord: testS3EventRecord, + want: nil, }, "s3 error": { mockSetup: func(m *MockS3APIClient) { m.EXPECT().GetObject(gomock.Any(), gomock.Any()). Return(nil, errors.New("access denied")) }, - cfg: testutil.EmptyConfig(), - wantErr: true, + cfg: testutil.EmptyConfig(), + eventRecord: testS3EventRecord, + wantErr: true, }, "ddtags extraction": { mockSetup: func(m *MockS3APIClient) { @@ -111,8 +124,8 @@ func TestProcessS3Record(t *testing.T) { Body: io.NopCloser(strings.NewReader("2024-01-15 ERROR NullPointer\n at com.foo.Bar\n2024-01-15 INFO started")), }, nil) }, - cfg: &config.Config{S3MultilineLogRegex: regexp.MustCompile(`\d{4}-\d{2}-\d{2}`)}, - chanSize: 2, + cfg: &config.Config{S3MultilineLogRegex: regexp.MustCompile(`\d{4}-\d{2}-\d{2}`)}, + eventRecord: testS3EventRecord, want: []model.LogEntry{ wantS3Entry("2024-01-15 ERROR NullPointer\n at com.foo.Bar\n", "s3", "s3", nil), wantS3Entry("2024-01-15 INFO started", "s3", "s3", nil), @@ -140,6 +153,58 @@ func TestProcessS3Record(t *testing.T) { chanSize: 1, want: []model.LogEntry{wantS3Entry("line1", "s3", "s3", model.Tags{"env:prod", "team:aws"})}, }, + "cloudtrail with ec2 host": { + mockSetup: func(m *MockS3APIClient) { + data := testutil.MustGzipJSON(t, map[string]any{ + "Records": []any{ + map[string]any{ + "eventName": "DescribeTable", + "userIdentity": map[string]any{ + "arn": "arn:aws:sts::601427279990:assumed-role/MyRole/i-08014e4f62ccf762d", + }, + }, + }, + }) + m.EXPECT().GetObject(gomock.Any(), gomock.Any()). + Return(&s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader(data)), + }, nil) + }, + cfg: testutil.EmptyConfig(), + eventRecord: testCloudTrailEventRecord, + want: []model.LogEntry{ + wantCloudTrailEntry( + `{"eventName":"DescribeTable","userIdentity":{"arn":"arn:aws:sts::601427279990:assumed-role/MyRole/i-08014e4f62ccf762d"}}`, + "i-08014e4f62ccf762d", + ), + }, + }, + "cloudtrail without ec2 host": { + mockSetup: func(m *MockS3APIClient) { + data := testutil.MustGzipJSON(t, map[string]any{ + "Records": []any{ + map[string]any{ + "eventName": "DescribeTable", + "userIdentity": map[string]any{ + "arn": "arn:aws:iam::601427279990:user/admin", + }, + }, + }, + }) + m.EXPECT().GetObject(gomock.Any(), gomock.Any()). + Return(&s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader(data)), + }, nil) + }, + cfg: testutil.EmptyConfig(), + eventRecord: testCloudTrailEventRecord, + want: []model.LogEntry{ + wantCloudTrailEntry( + `{"eventName":"DescribeTable","userIdentity":{"arn":"arn:aws:iam::601427279990:user/admin"}}`, + "", + ), + }, + }, } for name, tc := range tests { @@ -150,10 +215,10 @@ func TestProcessS3Record(t *testing.T) { mock := NewMockS3APIClient(ctrl) tc.mockSetup(mock) - out := make(chan model.LogEntry, tc.chanSize) + out := make(chan model.LogEntry, len(tc.want)) handler := NewS3(tc.cfg) - err := handler.processRecord(t.Context(), mock, out, testS3Record, testutil.LambdaOrigin()) + err := handler.processRecord(t.Context(), mock, out, tc.eventRecord, testutil.LambdaOrigin()) close(out) var got []model.LogEntry @@ -177,6 +242,20 @@ func TestProcessS3Record(t *testing.T) { } } +func wantCloudTrailEntry(message, host string) model.LogEntry { + entry := model.NewLogEntry() + entry.Message = message + entry.Host = host + entry.Source = sourceCloudtrail + entry.Service = sourceCloudtrail + entry.Tags = model.Tags{"service:" + sourceCloudtrail} + entry.Metadata = model.S3Metadata{ + LambdaOrigin: testutil.LambdaOrigin(), + Origin: model.S3Origin{Bucket: "trail-bucket", Key: testCloudTrailKey}, + } + return entry +} + func wantS3Entry(message, source, service string, tags model.Tags) model.LogEntry { entry := model.NewLogEntry() entry.Message = message diff --git a/aws/logs_monitoring_go/internal/handling/scanner.go b/aws/logs_monitoring_go/internal/handling/scanner.go index 60cd249e8..ab5b63daa 100644 --- a/aws/logs_monitoring_go/internal/handling/scanner.go +++ b/aws/logs_monitoring_go/internal/handling/scanner.go @@ -9,7 +9,9 @@ import ( "bufio" "bytes" "io" + "iter" "regexp" + "strings" ) const ( @@ -77,3 +79,18 @@ func (s *Scanner) splitOnLines(data []byte, atEOF bool) (advance int, token []by return 0, nil, nil } + +func scan(r io.Reader, re *regexp.Regexp) iter.Seq2[string, error] { + return func(yield func(string, error) bool) { + scanner := NewScanner(r, re) + for scanner.Scan() { + message := strings.ToValidUTF8(scanner.Text(), "") + if !yield(message, nil) { + return + } + } + if err := scanner.Err(); err != nil { + yield("", err) + } + } +} diff --git a/aws/logs_monitoring_go/internal/parsing/content.go b/aws/logs_monitoring_go/internal/parsing/content.go new file mode 100644 index 000000000..e0b96c0d6 --- /dev/null +++ b/aws/logs_monitoring_go/internal/parsing/content.go @@ -0,0 +1,24 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package parsing + +import "encoding/json" + +//go:generate stringer -type ContentType -trimprefix ContentType -output content_string.go +type ContentType int + +const ( + ContentTypeUnknown ContentType = iota + ContentTypeCloudwatchLogs + ContentTypeS3 + ContentTypeKinesis + ContentTypeEventBridge +) + +type ParsedEvent struct { + ContentType ContentType + Payload json.RawMessage +} diff --git a/aws/logs_monitoring_go/internal/parsing/content_string.go b/aws/logs_monitoring_go/internal/parsing/content_string.go new file mode 100644 index 000000000..3a0a6d398 --- /dev/null +++ b/aws/logs_monitoring_go/internal/parsing/content_string.go @@ -0,0 +1,28 @@ +// Code generated by "stringer -type ContentType -trimprefix ContentType -output content_string.go"; DO NOT EDIT. + +package parsing + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[ContentTypeUnknown-0] + _ = x[ContentTypeCloudwatchLogs-1] + _ = x[ContentTypeS3-2] + _ = x[ContentTypeKinesis-3] + _ = x[ContentTypeEventBridge-4] +} + +const _ContentType_name = "UnknownCloudwatchLogsS3KinesisEventBridge" + +var _ContentType_index = [...]uint8{0, 7, 21, 23, 30, 41} + +func (i ContentType) String() string { + idx := int(i) - 0 + if i < 0 || idx >= len(_ContentType_index)-1 { + return "ContentType(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _ContentType_name[_ContentType_index[idx]:_ContentType_index[idx+1]] +} diff --git a/aws/logs_monitoring_go/internal/parsing/eventbridge.go b/aws/logs_monitoring_go/internal/parsing/eventbridge.go new file mode 100644 index 000000000..13c0ba726 --- /dev/null +++ b/aws/logs_monitoring_go/internal/parsing/eventbridge.go @@ -0,0 +1,136 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package parsing + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "strings" + + "github.com/aws/aws-lambda-go/events" +) + +const ( + eventBridgeSourceS3 = "aws.s3" + eventBridgeDetailTypeS3 = "Object Created" +) + +func parseEventBridge(event json.RawMessage) ([]ParsedEvent, error) { + source, detailType, detail, err := decodeEventBridgeEnvelope(event) + if err != nil { + return nil, fmt.Errorf("decode eventbridge: %w", err) + } + + if source == eventBridgeSourceS3 && strings.Contains(detailType, eventBridgeDetailTypeS3) { + s3Event, err := buildS3EventFromEventBridge(detail) + if err != nil { + return nil, fmt.Errorf("build s3 event from eventbridge: %w", err) + } + return []ParsedEvent{{ContentTypeS3, s3Event}}, nil + } + + return []ParsedEvent{{ContentTypeEventBridge, event}}, nil +} + +func decodeEventBridgeEnvelope(event json.RawMessage) (source, detailType string, detail json.RawMessage, err error) { + dec := json.NewDecoder(bytes.NewReader(event)) + + if t, err := dec.Token(); err != nil || t != json.Delim('{') { + return "", "", nil, errors.New("eventbridge envelope: expected '{'") + } + + for dec.More() { + key, err := dec.Token() + if err != nil { + return "", "", nil, fmt.Errorf("read key: %w", err) + } + switch key { + case "source": + if err := dec.Decode(&source); err != nil { + return "", "", nil, fmt.Errorf("decode source: %w", err) + } + case "detail-type": + if err := dec.Decode(&detailType); err != nil { + return "", "", nil, fmt.Errorf("decode detail-type: %w", err) + } + case "detail": + if err := dec.Decode(&detail); err != nil { + return "", "", nil, fmt.Errorf("decode detail: %w", err) + } + default: + var skip json.RawMessage + if err := dec.Decode(&skip); err != nil { + return "", "", nil, fmt.Errorf("skip field: %w", err) + } + } + } + + return source, detailType, detail, nil +} + +func buildS3EventFromEventBridge(detail json.RawMessage) (json.RawMessage, error) { + bucketName, objectKey, err := decodeEventBridgeS3Detail(detail) + if err != nil { + return nil, fmt.Errorf("decode eventbridge s3 detail: %w", err) + } + + s3Event := events.S3Event{ + Records: []events.S3EventRecord{{ + EventSource: eventSourceS3, + S3: events.S3Entity{ + Bucket: events.S3Bucket{Name: bucketName}, + Object: events.S3Object{Key: objectKey, URLDecodedKey: objectKey}, + }, + }}, + } + payload, err := json.Marshal(s3Event) + if err != nil { + return nil, fmt.Errorf("marshal synthetic s3 event: %w", err) + } + return payload, nil +} + +func decodeEventBridgeS3Detail(detail json.RawMessage) (bucket, key string, err error) { + dec := json.NewDecoder(bytes.NewReader(detail)) + + if t, err := dec.Token(); err != nil || t != json.Delim('{') { + return "", "", errors.New("eventbridge s3 detail: expected '{'") + } + + for dec.More() { + k, err := dec.Token() + if err != nil { + return "", "", fmt.Errorf("read key: %w", err) + } + switch k { + case "bucket": + var b struct { + Name string `json:"name"` + } + if err := dec.Decode(&b); err != nil { + return "", "", fmt.Errorf("decode bucket: %w", err) + } + bucket = b.Name + case "object": + var o struct { + Key string `json:"key"` + } + if err := dec.Decode(&o); err != nil { + return "", "", fmt.Errorf("decode object: %w", err) + } + key = o.Key + default: + var skip json.RawMessage + if err := dec.Decode(&skip); err != nil { + return "", "", fmt.Errorf("skip field: %w", err) + } + } + } + + return bucket, key, nil +} diff --git a/aws/logs_monitoring_go/internal/parsing/invocation_source.go b/aws/logs_monitoring_go/internal/parsing/invocation_source.go deleted file mode 100644 index d37fa41da..000000000 --- a/aws/logs_monitoring_go/internal/parsing/invocation_source.go +++ /dev/null @@ -1,89 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2026-Present Datadog, Inc. - -package parsing - -import ( - "bytes" - "encoding/json" -) - -//go:generate stringer -type InvocationSource -trimprefix InvocationSource -output invocation_source_string.go -type InvocationSource int - -const ( - InvocationSourceUnknown InvocationSource = iota - InvocationSourceCloudwatchLogs - InvocationSourceS3 - InvocationSourceSNS - InvocationSourceSQS - InvocationSourceKinesis -) - -func DetectInvocationSource(event json.RawMessage) InvocationSource { - dec := json.NewDecoder(bytes.NewReader(event)) - - t, err := dec.Token() - if err != nil || t != json.Delim('{') { - return InvocationSourceUnknown - } - - key, err := dec.Token() - if err != nil { - return InvocationSourceUnknown - } - if key == "awslogs" { - return InvocationSourceCloudwatchLogs - } - - if key == "Records" { - return detectFromRecords(dec) - } - - return InvocationSourceUnknown -} - -func detectFromRecords(dec *json.Decoder) InvocationSource { - t, err := dec.Token() - if err != nil || t != json.Delim('[') { - return InvocationSourceUnknown - } - - t, err = dec.Token() - if err != nil || t != json.Delim('{') { - return InvocationSourceUnknown - } - - for dec.More() { - key, err := dec.Token() - if err != nil { - return InvocationSourceUnknown - } - if key == "eventSource" { - val, err := dec.Token() - if err != nil { - return InvocationSourceUnknown - } - switch val { - case "aws:s3": - return InvocationSourceS3 - case "aws:sns": - return InvocationSourceSNS - case "aws:sqs": - return InvocationSourceSQS - case "aws:kinesis": - return InvocationSourceKinesis - default: - return InvocationSourceUnknown - } - } - var skip json.RawMessage - if err := dec.Decode(&skip); err != nil { - return InvocationSourceUnknown - } - } - - return InvocationSourceUnknown -} diff --git a/aws/logs_monitoring_go/internal/parsing/invocation_source_bench_test.go b/aws/logs_monitoring_go/internal/parsing/invocation_source_bench_test.go deleted file mode 100644 index 5f0ab09d5..000000000 --- a/aws/logs_monitoring_go/internal/parsing/invocation_source_bench_test.go +++ /dev/null @@ -1,67 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2026-Present Datadog, Inc. - -package parsing - -import ( - "bytes" - "encoding/json" - "fmt" - "strings" - "testing" -) - -const benchS3Records = 500 - -var ( - smallCWEvent = json.RawMessage(`{"awslogs":{"data":"dGVzdA=="}}`) - largeS3EventFirst = makeS3EventSourceFirst(benchS3Records) - largeS3EventLast = makeS3EventSourceLast(benchS3Records) -) - -var benchCases = []struct { - name string - event json.RawMessage -}{ - {"SmallCW", smallCWEvent}, - {"LargeS3_SourceFirst", largeS3EventFirst}, - {"LargeS3_SourceLast", largeS3EventLast}, -} - -func BenchmarkDetectInvocationSource(b *testing.B) { - for _, tc := range benchCases { - b.Run(tc.name, func(b *testing.B) { - for b.Loop() { - DetectInvocationSource(tc.event) - } - }) - } -} - -func makeS3EventSourceFirst(nRecords int) json.RawMessage { - var buf bytes.Buffer - buf.WriteString(`{"Records":[`) - for i := range nRecords { - if i > 0 { - buf.WriteByte(',') - } - fmt.Fprintf(&buf, `{"eventSource":"aws:s3","s3":{"bucket":{"name":"bucket-%d"},"object":{"key":"%s"}}}`, i, strings.Repeat("x", 200)) - } - buf.WriteString(`]}`) - return json.RawMessage(buf.Bytes()) -} - -func makeS3EventSourceLast(nRecords int) json.RawMessage { - var buf bytes.Buffer - buf.WriteString(`{"Records":[`) - for i := range nRecords { - if i > 0 { - buf.WriteByte(',') - } - fmt.Fprintf(&buf, `{"s3":{"bucket":{"name":"bucket-%d"},"object":{"key":"%s"}},"eventSource":"aws:s3"}`, i, strings.Repeat("x", 200)) - } - buf.WriteString(`]}`) - return json.RawMessage(buf.Bytes()) -} diff --git a/aws/logs_monitoring_go/internal/parsing/invocation_source_string.go b/aws/logs_monitoring_go/internal/parsing/invocation_source_string.go deleted file mode 100644 index 4e659dadd..000000000 --- a/aws/logs_monitoring_go/internal/parsing/invocation_source_string.go +++ /dev/null @@ -1,29 +0,0 @@ -// Code generated by "stringer -type InvocationSource -trimprefix InvocationSource -output invocation_source_string.go"; DO NOT EDIT. - -package parsing - -import "strconv" - -func _() { - // An "invalid array index" compiler error signifies that the constant values have changed. - // Re-run the stringer command to generate them again. - var x [1]struct{} - _ = x[InvocationSourceUnknown-0] - _ = x[InvocationSourceCloudwatchLogs-1] - _ = x[InvocationSourceS3-2] - _ = x[InvocationSourceSNS-3] - _ = x[InvocationSourceSQS-4] - _ = x[InvocationSourceKinesis-5] -} - -const _InvocationSource_name = "UnknownCloudwatchLogsS3SNSSQSKinesis" - -var _InvocationSource_index = [...]uint8{0, 7, 21, 23, 26, 29, 36} - -func (i InvocationSource) String() string { - idx := int(i) - 0 - if i < 0 || idx >= len(_InvocationSource_index)-1 { - return "InvocationSource(" + strconv.FormatInt(int64(i), 10) + ")" - } - return _InvocationSource_name[_InvocationSource_index[idx]:_InvocationSource_index[idx+1]] -} diff --git a/aws/logs_monitoring_go/internal/parsing/invocation_source_test.go b/aws/logs_monitoring_go/internal/parsing/invocation_source_test.go deleted file mode 100644 index 2a76478b9..000000000 --- a/aws/logs_monitoring_go/internal/parsing/invocation_source_test.go +++ /dev/null @@ -1,52 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2026-Present Datadog, Inc. - -package parsing - -import ( - "encoding/json" - "testing" -) - -func TestDetectInvocationSource(t *testing.T) { - t.Parallel() - - tests := map[string]struct { - event json.RawMessage - wantKey InvocationSource - }{ - "cloudwatch": { - event: json.RawMessage(`{"awslogs":{"data":"dGVzdA=="}}`), - wantKey: InvocationSourceCloudwatchLogs, - }, - "s3": { - event: json.RawMessage(`{"Records":[{"s3":{"bucket":{"name":"my-bucket"},"object":{"key":"my-key"}},"eventSource":"aws:s3"}]}`), - wantKey: InvocationSourceS3, - }, - "empty object": { - event: json.RawMessage(`{}`), - wantKey: InvocationSourceUnknown, - }, - "not JSON": { - event: json.RawMessage(`not a json`), - wantKey: InvocationSourceUnknown, - }, - "empty input": { - event: json.RawMessage(``), - wantKey: InvocationSourceUnknown, - }, - } - - for name, tc := range tests { - t.Run(name, func(t *testing.T) { - t.Parallel() - - got := DetectInvocationSource(tc.event) - if got != tc.wantKey { - t.Errorf("got %d, want %d", got, tc.wantKey) - } - }) - } -} diff --git a/aws/logs_monitoring_go/internal/parsing/parse.go b/aws/logs_monitoring_go/internal/parsing/parse.go new file mode 100644 index 000000000..aa000460a --- /dev/null +++ b/aws/logs_monitoring_go/internal/parsing/parse.go @@ -0,0 +1,92 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package parsing + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" +) + +const ( + eventSourceS3 = "aws:s3" + eventSourceKinesis = "aws:kinesis" +) + +func Parse(event json.RawMessage) ([]ParsedEvent, error) { + dec := json.NewDecoder(bytes.NewReader(event)) + + if t, err := dec.Token(); err != nil || t != json.Delim('{') { + return nil, errors.New("expected JSON object") + } + + for dec.More() { + key, err := dec.Token() + if err != nil { + return nil, fmt.Errorf("read key: %w", err) + } + + switch key { + case "awslogs": + return []ParsedEvent{{ContentTypeCloudwatchLogs, event}}, nil + case "Records": + return parseRecords(event, dec) + case "detail": + return parseEventBridge(event) + default: + var skip json.RawMessage + if err := dec.Decode(&skip); err != nil { + return nil, fmt.Errorf("skip value: %w", err) + } + } + } + + return nil, errors.New("unsupported event") +} + +func parseRecords(event json.RawMessage, dec *json.Decoder) ([]ParsedEvent, error) { + if t, err := dec.Token(); err != nil || t != json.Delim('[') { + return nil, errors.New("parse records: expected array") + } + if t, err := dec.Token(); err != nil || t != json.Delim('{') { + return nil, errors.New("parse records: expected object") + } + + for dec.More() { + key, err := dec.Token() + if err != nil { + return nil, fmt.Errorf("read record key: %w", err) + } + if key != "eventSource" && key != "EventSource" { + var skip json.RawMessage + if err := dec.Decode(&skip); err != nil { + return nil, fmt.Errorf("skip record field: %w", err) + } + continue + } + + val, err := dec.Token() + if err != nil { + return nil, fmt.Errorf("read event source value: %w", err) + } + + source, ok := val.(string) + if !ok { + return nil, fmt.Errorf("eventSource is not a string: %v", val) + } + switch source { + case eventSourceS3: + return []ParsedEvent{{ContentTypeS3, event}}, nil + case eventSourceKinesis: + return []ParsedEvent{{ContentTypeKinesis, event}}, nil + default: + return nil, fmt.Errorf("parse records: unsupported event source %q", source) + } + } + + return nil, errors.New("parse records: no eventSource found") +} diff --git a/aws/logs_monitoring_go/internal/parsing/parse_test.go b/aws/logs_monitoring_go/internal/parsing/parse_test.go new file mode 100644 index 000000000..545aeafb5 --- /dev/null +++ b/aws/logs_monitoring_go/internal/parsing/parse_test.go @@ -0,0 +1,94 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package parsing + +import ( + "encoding/json" + "testing" +) + +func TestParse(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + event json.RawMessage + want []ContentType + wantErr bool + }{ + "cloudwatch logs": { + event: json.RawMessage(`{"awslogs":{"data":"dGVzdA=="}}`), + want: []ContentType{ContentTypeCloudwatchLogs}, + }, + "s3": { + event: json.RawMessage(`{"Records":[{"eventSource":"aws:s3","s3":{"bucket":{"name":"b"},"object":{"key":"k"}}}]}`), + want: []ContentType{ContentTypeS3}, + }, + "kinesis": { + event: json.RawMessage(`{"Records":[{"eventSource":"aws:kinesis","kinesis":{"data":"dGVzdA=="}}]}`), + want: []ContentType{ContentTypeKinesis}, + }, + "eventbridge generic": { + event: json.RawMessage(`{"version":"0","id":"abc","detail-type":"Scheduled Event","source":"aws.events","detail":{}}`), + want: []ContentType{ContentTypeEventBridge}, + }, + "eventbridge s3": { + event: json.RawMessage(`{"version":"0","detail-type":"Object Created","source":"aws.s3","detail":{"bucket":{"name":"my-bucket"},"object":{"key":"my-key"}}}`), + want: []ContentType{ContentTypeS3}, + }, + "eventbridge ec2": { + event: json.RawMessage(`{"version":"0","detail-type":"EC2 Instance State-change Notification","source":"aws.ec2","detail":{"instance-id":"i-123"}}`), + want: []ContentType{ContentTypeEventBridge}, + }, + "eventbridge s3 without object created": { + event: json.RawMessage(`{"version":"0","detail-type":"Object Deleted","source":"aws.s3","detail":{}}`), + want: []ContentType{ContentTypeEventBridge}, + }, + "empty object": { + event: json.RawMessage(`{}`), + wantErr: true, + }, + "unsupported source": { + event: json.RawMessage(`{"Records":[{"eventSource":"aws:dynamodb"}]}`), + wantErr: true, + }, + "not JSON": { + event: json.RawMessage(`not json`), + wantErr: true, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + t.Parallel() + + got, err := Parse(tc.event) + + if tc.wantErr { + if err == nil { + t.Fatal("expected error, got nil") + } + return + } + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(got) != len(tc.want) { + t.Fatalf("got %d events, want %d", len(got), len(tc.want)) + } + + for i, pe := range got { + if pe.ContentType != tc.want[i] { + t.Errorf("event[%d]: got ContentType %v, want %v", i, pe.ContentType, tc.want[i]) + } + if len(pe.Payload) == 0 { + t.Errorf("event[%d]: empty payload", i) + } + } + }) + } +} diff --git a/aws/logs_monitoring_go/internal/pipeline/pipeline.go b/aws/logs_monitoring_go/internal/pipeline/pipeline.go index 28ddf1598..f762ff6c2 100644 --- a/aws/logs_monitoring_go/internal/pipeline/pipeline.go +++ b/aws/logs_monitoring_go/internal/pipeline/pipeline.go @@ -7,31 +7,47 @@ package pipeline import ( "context" - "encoding/json" + "errors" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/forwarding" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/handling" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing" "golang.org/x/sync/errgroup" ) func Start( ctx context.Context, - event json.RawMessage, - run *Run, + parsedEvents []parsing.ParsedEvent, + cfg *config.Config, ) error { - g, ctx := errgroup.WithContext(ctx) + if len(parsedEvents) == 0 { + return errors.New("no events to process") + } + + eg, ctx := errgroup.WithContext(ctx) entries := make(chan model.LogEntry) - forwarder := forwarding.NewForwarder(run.Cfg, forwarding.Client, run.Storage) + forwarder := forwarding.NewForwarder(cfg, forwarding.Client, forwarding.StorageFromContentType(parsedEvents[0].ContentType)) - g.Go(func() error { + eg.Go(func() error { defer close(entries) - return run.Handler.Handle(ctx, event, entries) + for _, parsedEvent := range parsedEvents { + handler, err := handling.NewHandler(parsedEvent.ContentType, cfg) + if err != nil { + return err + } + if err := handler.Handle(ctx, parsedEvent.Payload, entries); err != nil { + return err + } + } + return nil }) - g.Go(func() error { + eg.Go(func() error { return forwarder.Start(ctx, entries) }) - return g.Wait() + return eg.Wait() } diff --git a/aws/logs_monitoring_go/internal/pipeline/run.go b/aws/logs_monitoring_go/internal/pipeline/run.go deleted file mode 100644 index 5aee557a1..000000000 --- a/aws/logs_monitoring_go/internal/pipeline/run.go +++ /dev/null @@ -1,25 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2026-Present Datadog, Inc. - -package pipeline - -import ( - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/handling" -) - -type Run struct { - Cfg *config.Config - Handler handling.Handler - Storage string -} - -func NewRun(cfg *config.Config, handler handling.Handler, storage string) *Run { - return &Run{ - Cfg: cfg, - Handler: handler, - Storage: storage, - } -}