From b2678801e7e9eb63f4dd57e8101852ad7fb20ace Mon Sep 17 00:00:00 2001 From: Nabil Dakkoune Date: Sun, 3 May 2026 11:29:32 +0200 Subject: [PATCH 1/9] [AWSINTS-3594] feat(go-forwarder): add CloudTrail --- .../internal/handling/cloudtrail.go | 106 +++++++++ .../internal/handling/cloudtrail_test.go | 219 ++++++++++++++++++ .../internal/handling/cloudwatch.go | 7 + .../internal/handling/cloudwatch_test.go | 74 ++++++ .../internal/handling/s3.go | 44 ++-- .../internal/handling/s3_test.go | 115 +++++++-- .../internal/handling/scanner.go | 17 ++ 7 files changed, 548 insertions(+), 34 deletions(-) create mode 100644 aws/logs_monitoring_go/internal/handling/cloudtrail.go create mode 100644 aws/logs_monitoring_go/internal/handling/cloudtrail_test.go diff --git a/aws/logs_monitoring_go/internal/handling/cloudtrail.go b/aws/logs_monitoring_go/internal/handling/cloudtrail.go new file mode 100644 index 000000000..d4d221d07 --- /dev/null +++ b/aws/logs_monitoring_go/internal/handling/cloudtrail.go @@ -0,0 +1,106 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package handling + +import ( + "compress/gzip" + "encoding/json" + "errors" + "fmt" + "io" + "iter" + "log/slog" + "regexp" + "sync" +) + +const ( + cloudTrailARNKey = "arn" + cloudTrailUserIdentityKey = "userIdentity" +) + +var cloudTrailRegex = sync.OnceValue(func() *regexp.Regexp { + return regexp.MustCompile(`\d+_CloudTrail(|-Digest|-Insight)_\w{2}(|-gov|-cn)-\w{4,9}-\d_(|.+)\d{8}T\d{4,6}Z(|.+)\.json\.gz$`) +}) + +var ec2InstanceRegexp = sync.OnceValue(func() *regexp.Regexp { + return regexp.MustCompile(`^arn:aws:sts::.*?:assumed-role/(?P.*?)/(?Pi-([0-9a-f]{8}|[0-9a-f]{17}))$`) +}) + +func decodeCloudTrail(r io.Reader) iter.Seq[s3Record] { + return func(yield func(s3Record) bool) { + gz, err := gzip.NewReader(r) + if err != nil { + yield(s3Record{Err: fmt.Errorf("decode cloudtrail gzip: %w", err)}) + return + } + defer gz.Close() //nolint:errcheck + + dec := json.NewDecoder(gz) + + if t, err := dec.Token(); err != nil || t != json.Delim('{') { + yield(s3Record{Err: errors.New("decode cloudtrail: expected '{' at start of JSON")}) + return + } + if t, err := dec.Token(); err != nil || t != "Records" { + yield(s3Record{Err: errors.New("decode cloudtrail: expected 'Records' key")}) + return + } + if t, err := dec.Token(); err != nil || t != json.Delim('[') { + yield(s3Record{Err: errors.New("decode cloudtrail: expected '[' at start of Records array")}) + return + } + + for dec.More() { + var record map[string]any + if err := dec.Decode(&record); err != nil { + yield(s3Record{Err: fmt.Errorf("decode cloudtrail record: %w", err)}) + return + } + + msg, err := json.Marshal(record) + if err != nil { + yield(s3Record{Err: fmt.Errorf("marshal cloudtrail record: %w", err)}) + return + } + + host := cloudtrailHost(record) + if !yield(s3Record{Message: string(msg), Host: host}) { + return + } + } + } +} + +func cloudtrailHostFromMessage(message string) string { + var record map[string]any + if err := json.Unmarshal([]byte(message), &record); err != nil { + return "" + } + return cloudtrailHost(record) +} + +func cloudtrailHost(record map[string]any) string { + ui, ok := record[cloudTrailUserIdentityKey].(map[string]any) + if !ok { + slog.Debug(cloudTrailUserIdentityKey + " key not found, cloudtrail host extraction skipped") + return "" + } + arn, ok := ui[cloudTrailARNKey].(string) + if !ok { + slog.Debug(cloudTrailARNKey + " key not found, cloudtrail host extraction skipped") + return "" + } + + re := ec2InstanceRegexp() + matches := re.FindStringSubmatch(arn) + if matches == nil { + slog.Debug(arn + " arn did not match, cloudtrail host extraction skipped") + return "" + } + + return matches[re.SubexpIndex("host")] +} diff --git a/aws/logs_monitoring_go/internal/handling/cloudtrail_test.go b/aws/logs_monitoring_go/internal/handling/cloudtrail_test.go new file mode 100644 index 000000000..b798602c2 --- /dev/null +++ b/aws/logs_monitoring_go/internal/handling/cloudtrail_test.go @@ -0,0 +1,219 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package handling + +import ( + "bytes" + "testing" + + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/testutil" + "github.com/google/go-cmp/cmp" +) + +func TestCloudTrailRegex(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + key string + want bool + }{ + "standard cloudtrail": { + key: "601427279990_CloudTrail_us-east-1_20210503T0000Z_QrttGEk4ZcBTLwj5.json.gz", + want: true, + }, + "cloudtrail digest": { + key: "601427279990_CloudTrail-Digest_us-east-1_20210503T0000Z_digest.json.gz", + want: true, + }, + "cloudtrail insight": { + key: "601427279990_CloudTrail-Insight_us-east-1_20210503T0000Z_insight.json.gz", + want: true, + }, + "gov region": { + key: "601427279990_CloudTrail_us-gov-west-1_20210503T0000Z_abc.json.gz", + want: true, + }, + "cn region": { + key: "601427279990_CloudTrail_cn-north-1_20210503T0000Z_abc.json.gz", + want: true, + }, + "not cloudtrail": { + key: "some-random-log-file.json.gz", + want: false, + }, + "waf log": { + key: "aws-waf-logs-something.json.gz", + want: false, + }, + "plain text file": { + key: "access.log", + want: false, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + t.Parallel() + got := cloudTrailRegex().MatchString(tc.key) + if got != tc.want { + t.Errorf("cloudTrailRegex().MatchString(%q) = %v, want %v", tc.key, got, tc.want) + } + }) + } +} + +func TestCloudtrailHost(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + record map[string]any + want string + }{ + "ec2 instance (17)": { + record: map[string]any{ + "userIdentity": map[string]any{ + "arn": "arn:aws:sts::601427279990:assumed-role/MyRole/i-08014e4f62ccf762d", + }, + }, + want: "i-08014e4f62ccf762d", + }, + "ec2 instance (8)": { + record: map[string]any{ + "userIdentity": map[string]any{ + "arn": "arn:aws:sts::601427279990:assumed-role/MyRole/i-abcd1234", + }, + }, + want: "i-abcd1234", + }, + "non ec2 arn": { + record: map[string]any{ + "userIdentity": map[string]any{ + "arn": "arn:aws:sts::601427279990:assumed-role/MyRole/my-session", + }, + }, + want: "", + }, + "missing userIdentity": { + record: map[string]any{"eventName": "DescribeTable"}, + want: "", + }, + "missing arn": { + record: map[string]any{ + "userIdentity": map[string]any{ + "type": "AssumedRole", + }, + }, + want: "", + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + t.Parallel() + host := cloudtrailHost(tc.record) + if host != tc.want { + t.Errorf("want %q, got %q", tc.want, host) + } + }) + } +} + +func TestDecodeCloudTrail(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + input []byte + want []s3Record + wantErr bool + }{ + "single record with ec2 host": { + input: testutil.MustGzipJSON(t, map[string]any{ + "Records": []any{ + map[string]any{ + "eventName": "DescribeTable", + "userIdentity": map[string]any{ + "arn": "arn:aws:sts::601427279990:assumed-role/MyRole/i-08014e4f62ccf762d", + }, + }, + }, + }), + want: []s3Record{ + { + Message: `{"eventName":"DescribeTable","userIdentity":{"arn":"arn:aws:sts::601427279990:assumed-role/MyRole/i-08014e4f62ccf762d"}}`, + Host: "i-08014e4f62ccf762d", + }, + }, + }, + "single record without ec2 host": { + input: testutil.MustGzipJSON(t, map[string]any{ + "Records": []any{ + map[string]any{ + "eventName": "DescribeTable", + "userIdentity": map[string]any{ + "arn": "arn:aws:iam::601427279990:user/admin", + }, + }, + }, + }), + want: []s3Record{ + { + Message: `{"eventName":"DescribeTable","userIdentity":{"arn":"arn:aws:iam::601427279990:user/admin"}}`, + Host: "", + }, + }, + }, + "multiple records": { + input: testutil.MustGzipJSON(t, map[string]any{ + "Records": []any{ + map[string]any{"eventName": "event1"}, + map[string]any{"eventName": "event2"}, + }, + }), + want: []s3Record{ + {Message: `{"eventName":"event1"}`}, + {Message: `{"eventName":"event2"}`}, + }, + }, + "empty records array": { + input: testutil.MustGzipJSON(t, map[string]any{ + "Records": []any{}, + }), + want: nil, + }, + "invalid gzip": { + input: []byte("not gzip"), + wantErr: true, + }, + "invalid json": { + input: testutil.MustGzipJSON(t, "not an object"), + wantErr: true, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + t.Parallel() + + var got []s3Record + for rec := range decodeCloudTrail(bytes.NewReader(tc.input)) { + if rec.Err != nil { + if !tc.wantErr { + t.Fatalf("unexpected error: %v", rec.Err) + } + return + } + got = append(got, rec) + } + + if tc.wantErr { + t.Fatal("expected error, got none") + } + if diff := cmp.Diff(tc.want, got); diff != "" { + t.Errorf("mismatch (-want +got):\n%s", diff) + } + }) + } +} diff --git a/aws/logs_monitoring_go/internal/handling/cloudwatch.go b/aws/logs_monitoring_go/internal/handling/cloudwatch.go index 913eabe55..65fb9c6bd 100644 --- a/aws/logs_monitoring_go/internal/handling/cloudwatch.go +++ b/aws/logs_monitoring_go/internal/handling/cloudwatch.go @@ -134,6 +134,13 @@ func (h CloudwatchHandler) newCloudwatchLogEntry(event events.CloudwatchLogsLogE entry.Message = message entry.ID = event.ID entry.Timestamp = event.Timestamp + + if entry.Source == sourceCloudtrail { + if host := cloudtrailHostFromMessage(event.Message); host != "" { + entry.Host = host + } + } + return entry } diff --git a/aws/logs_monitoring_go/internal/handling/cloudwatch_test.go b/aws/logs_monitoring_go/internal/handling/cloudwatch_test.go index bb6c4c4ab..65f96a249 100644 --- a/aws/logs_monitoring_go/internal/handling/cloudwatch_test.go +++ b/aws/logs_monitoring_go/internal/handling/cloudwatch_test.go @@ -118,6 +118,80 @@ func TestCloudwatchHandler_Handle(t *testing.T) { }, }, }, + "cloudtrail with ec2 host": { + event: testutil.MustCloudwatchEvent(t, testutil.MustGzipJSON(t, map[string]any{ + "messageType": "DATA_MESSAGE", + "owner": "601427279990", + "logGroup": "cloudtrail-logs", + "logStream": "601427279990_CloudTrail_us-east-1", + "logEvents": []map[string]any{ + { + "id": "ct1", + "timestamp": 1620000000000, + "message": `{"eventName":"DescribeTable","userIdentity":{"arn":"arn:aws:sts::601427279990:assumed-role/MyRole/i-08014e4f62ccf762d"}}`, + }, + }, + })), + config: testutil.EmptyConfig(), + chanSize: 1, + want: []model.LogEntry{ + { + Message: `{"eventName":"DescribeTable","userIdentity":{"arn":"arn:aws:sts::601427279990:assumed-role/MyRole/i-08014e4f62ccf762d"}}`, + Source: "cloudtrail", + SourceCategory: "aws", + Service: "cloudtrail", + Tags: model.Tags{"service:cloudtrail"}, + Host: "i-08014e4f62ccf762d", + ID: "ct1", + Timestamp: 1620000000000, + Metadata: model.CloudwatchMetadata{ + LambdaOrigin: model.LambdaOrigin{ARN: testutil.ARN}, + Origin: model.CloudwatchOrigin{ + LogGroup: "cloudtrail-logs", + LogStream: "601427279990_CloudTrail_us-east-1", + Owner: "601427279990", + }, + }, + }, + }, + }, + "cloudtrail without ec2 host": { + event: testutil.MustCloudwatchEvent(t, testutil.MustGzipJSON(t, map[string]any{ + "messageType": "DATA_MESSAGE", + "owner": "601427279990", + "logGroup": "cloudtrail-logs", + "logStream": "601427279990_CloudTrail_us-east-1", + "logEvents": []map[string]any{ + { + "id": "ct2", + "timestamp": 1620000000000, + "message": `{"eventName":"DescribeTable","userIdentity":{"arn":"arn:aws:iam::601427279990:user/admin"}}`, + }, + }, + })), + config: testutil.EmptyConfig(), + chanSize: 1, + want: []model.LogEntry{ + { + Message: `{"eventName":"DescribeTable","userIdentity":{"arn":"arn:aws:iam::601427279990:user/admin"}}`, + Source: "cloudtrail", + SourceCategory: "aws", + Service: "cloudtrail", + Tags: model.Tags{"service:cloudtrail"}, + Host: "cloudtrail-logs", + ID: "ct2", + Timestamp: 1620000000000, + Metadata: model.CloudwatchMetadata{ + LambdaOrigin: model.LambdaOrigin{ARN: testutil.ARN}, + Origin: model.CloudwatchOrigin{ + LogGroup: "cloudtrail-logs", + LogStream: "601427279990_CloudTrail_us-east-1", + Owner: "601427279990", + }, + }, + }, + }, + }, "config overrides source, host, service and tags": { event: testutil.MustCloudwatchEvent(t, testutil.MustGzipJSON(t, map[string]any{ "messageType": "DATA_MESSAGE", diff --git a/aws/logs_monitoring_go/internal/handling/s3.go b/aws/logs_monitoring_go/internal/handling/s3.go index 955d52774..3d9a92eef 100644 --- a/aws/logs_monitoring_go/internal/handling/s3.go +++ b/aws/logs_monitoring_go/internal/handling/s3.go @@ -10,6 +10,7 @@ import ( "context" "encoding/json" "fmt" + "iter" "log/slog" "slices" "strings" @@ -27,6 +28,12 @@ const ( s3KeyCloudtrail = "_CloudTrail_" ) +type s3Record struct { + Message string + Host string + Err error +} + type S3Handler struct { cfg *config.Config } @@ -53,17 +60,17 @@ func (h S3Handler) Handle(ctx context.Context, event json.RawMessage, out chan<- return err } - for _, record := range s3Event.Records { - if err := h.processRecord(ctx, client, out, record, lambdaOrigin); err != nil { + for _, eventRecord := range s3Event.Records { + if err := h.processRecord(ctx, client, out, eventRecord, lambdaOrigin); err != nil { return err } } return nil } -func (h S3Handler) processRecord(ctx context.Context, client S3APIClient, out chan<- model.LogEntry, record events.S3EventRecord, lambdaOrigin model.LambdaOrigin) error { - bucket := record.S3.Bucket.Name - key := record.S3.Object.URLDecodedKey +func (h S3Handler) processRecord(ctx context.Context, client S3APIClient, out chan<- model.LogEntry, eventRecord events.S3EventRecord, lambdaOrigin model.LambdaOrigin) error { + bucket := eventRecord.S3.Bucket.Name + key := eventRecord.S3.Object.URLDecodedKey body, err := getS3Object(ctx, client, bucket, key) if err != nil { @@ -75,32 +82,37 @@ func (h S3Handler) processRecord(ctx context.Context, client S3APIClient, out ch } }() - scanner := NewScanner(body, h.cfg.S3MultilineLogRegex) - for scanner.Scan() { - message := strings.ToValidUTF8(scanner.Text(), "") - entry := h.newS3LogEntry(record, message, lambdaOrigin) + var records iter.Seq[s3Record] + if cloudTrailRegex().MatchString(key) { + records = decodeCloudTrail(body) + } else { + records = scan(body, h.cfg.S3MultilineLogRegex) + } + + for rec := range records { + if rec.Err != nil { + return rec.Err + } + entry := h.newS3LogEntry(eventRecord, rec.Message, lambdaOrigin) if h.cfg.Filter.ShouldExclude(entry.Message) { continue } + entry.Host = cmp.Or(h.cfg.Host, rec.Host) entry.Message = h.cfg.Scrubber.Scrub(entry.Message) if err := concurrent.SafeSender(ctx, out, entry); err != nil { return err } } - - if err := scanner.Err(); err != nil { - return err - } return nil } -func (h S3Handler) newS3LogEntry(record events.S3EventRecord, message string, lambdaOrigin model.LambdaOrigin) model.LogEntry { - key := record.S3.Object.URLDecodedKey +func (h S3Handler) newS3LogEntry(eventRecord events.S3EventRecord, message string, lambdaOrigin model.LambdaOrigin) model.LogEntry { + key := eventRecord.S3.Object.URLDecodedKey metadata := model.S3Metadata{ LambdaOrigin: lambdaOrigin, Origin: model.S3Origin{ - Bucket: record.S3.Bucket.Name, + Bucket: eventRecord.S3.Bucket.Name, Key: key, }, } diff --git a/aws/logs_monitoring_go/internal/handling/s3_test.go b/aws/logs_monitoring_go/internal/handling/s3_test.go index cca3f9158..0726d569f 100644 --- a/aws/logs_monitoring_go/internal/handling/s3_test.go +++ b/aws/logs_monitoring_go/internal/handling/s3_test.go @@ -6,6 +6,7 @@ package handling import ( + "bytes" "errors" "io" "regexp" @@ -21,12 +22,22 @@ import ( "go.uber.org/mock/gomock" ) -var testS3Record = events.S3EventRecord{ - S3: events.S3Entity{ - Bucket: events.S3Bucket{Name: "b"}, - Object: events.S3Object{URLDecodedKey: "k"}, - }, -} +var ( + testS3EventRecord = events.S3EventRecord{ + S3: events.S3Entity{ + Bucket: events.S3Bucket{Name: "b"}, + Object: events.S3Object{URLDecodedKey: "k"}, + }, + } + + testCloudTrailKey = "601427279990_CloudTrail_us-east-1_20210503T0000Z_QrttGEk4ZcBTLwj5.json.gz" + testCloudTrailEventRecord = events.S3EventRecord{ + S3: events.S3Entity{ + Bucket: events.S3Bucket{Name: "trail-bucket"}, + Object: events.S3Object{URLDecodedKey: testCloudTrailKey}, + }, + } +) func TestProcessS3Record(t *testing.T) { t.Parallel() @@ -34,8 +45,8 @@ func TestProcessS3Record(t *testing.T) { tests := map[string]struct { mockSetup func(m *MockS3APIClient) cfg *config.Config - chanSize int - want []model.LogEntry + eventRecord events.S3EventRecord + want []model.LogEntry wantErr bool }{ "single line": { @@ -56,8 +67,8 @@ func TestProcessS3Record(t *testing.T) { Body: io.NopCloser(strings.NewReader("line1\nline2\nline3")), }, nil) }, - cfg: testutil.EmptyConfig(), - chanSize: 3, + cfg: testutil.EmptyConfig(), + eventRecord: testS3EventRecord, want: []model.LogEntry{ wantS3Entry("line1", "s3", "s3", nil), wantS3Entry("line2", "s3", "s3", nil), @@ -71,16 +82,18 @@ func TestProcessS3Record(t *testing.T) { Body: io.NopCloser(strings.NewReader("")), }, nil) }, - cfg: testutil.EmptyConfig(), - want: nil, + cfg: testutil.EmptyConfig(), + eventRecord: testS3EventRecord, + want: nil, }, "s3 error": { mockSetup: func(m *MockS3APIClient) { m.EXPECT().GetObject(gomock.Any(), gomock.Any()). Return(nil, errors.New("access denied")) }, - cfg: testutil.EmptyConfig(), - wantErr: true, + cfg: testutil.EmptyConfig(), + eventRecord: testS3EventRecord, + wantErr: true, }, "ddtags extraction": { mockSetup: func(m *MockS3APIClient) { @@ -111,8 +124,8 @@ func TestProcessS3Record(t *testing.T) { Body: io.NopCloser(strings.NewReader("2024-01-15 ERROR NullPointer\n at com.foo.Bar\n2024-01-15 INFO started")), }, nil) }, - cfg: &config.Config{S3MultilineLogRegex: regexp.MustCompile(`\d{4}-\d{2}-\d{2}`)}, - chanSize: 2, + cfg: &config.Config{S3MultilineLogRegex: regexp.MustCompile(`\d{4}-\d{2}-\d{2}`)}, + eventRecord: testS3EventRecord, want: []model.LogEntry{ wantS3Entry("2024-01-15 ERROR NullPointer\n at com.foo.Bar\n", "s3", "s3", nil), wantS3Entry("2024-01-15 INFO started", "s3", "s3", nil), @@ -140,6 +153,58 @@ func TestProcessS3Record(t *testing.T) { chanSize: 1, want: []model.LogEntry{wantS3Entry("line1", "s3", "s3", model.Tags{"env:prod", "team:aws"})}, }, + "cloudtrail with ec2 host": { + mockSetup: func(m *MockS3APIClient) { + data := testutil.MustGzipJSON(t, map[string]any{ + "Records": []any{ + map[string]any{ + "eventName": "DescribeTable", + "userIdentity": map[string]any{ + "arn": "arn:aws:sts::601427279990:assumed-role/MyRole/i-08014e4f62ccf762d", + }, + }, + }, + }) + m.EXPECT().GetObject(gomock.Any(), gomock.Any()). + Return(&s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader(data)), + }, nil) + }, + cfg: testutil.EmptyConfig(), + eventRecord: testCloudTrailEventRecord, + want: []model.LogEntry{ + wantCloudTrailEntry( + `{"eventName":"DescribeTable","userIdentity":{"arn":"arn:aws:sts::601427279990:assumed-role/MyRole/i-08014e4f62ccf762d"}}`, + "i-08014e4f62ccf762d", + ), + }, + }, + "cloudtrail without ec2 host": { + mockSetup: func(m *MockS3APIClient) { + data := testutil.MustGzipJSON(t, map[string]any{ + "Records": []any{ + map[string]any{ + "eventName": "DescribeTable", + "userIdentity": map[string]any{ + "arn": "arn:aws:iam::601427279990:user/admin", + }, + }, + }, + }) + m.EXPECT().GetObject(gomock.Any(), gomock.Any()). + Return(&s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader(data)), + }, nil) + }, + cfg: testutil.EmptyConfig(), + eventRecord: testCloudTrailEventRecord, + want: []model.LogEntry{ + wantCloudTrailEntry( + `{"eventName":"DescribeTable","userIdentity":{"arn":"arn:aws:iam::601427279990:user/admin"}}`, + "", + ), + }, + }, } for name, tc := range tests { @@ -150,10 +215,10 @@ func TestProcessS3Record(t *testing.T) { mock := NewMockS3APIClient(ctrl) tc.mockSetup(mock) - out := make(chan model.LogEntry, tc.chanSize) + out := make(chan model.LogEntry, len(tc.want)) handler := NewS3(tc.cfg) - err := handler.processRecord(t.Context(), mock, out, testS3Record, testutil.LambdaOrigin()) + err := handler.processRecord(t.Context(), mock, out, tc.eventRecord, testutil.LambdaOrigin()) close(out) var got []model.LogEntry @@ -177,6 +242,20 @@ func TestProcessS3Record(t *testing.T) { } } +func wantCloudTrailEntry(message, host string) model.LogEntry { + entry := model.NewLogEntry() + entry.Message = message + entry.Host = host + entry.Source = sourceCloudtrail + entry.Service = sourceCloudtrail + entry.Tags = model.Tags{"service:" + sourceCloudtrail} + entry.Metadata = model.S3Metadata{ + LambdaOrigin: testutil.LambdaOrigin(), + Origin: model.S3Origin{Bucket: "trail-bucket", Key: testCloudTrailKey}, + } + return entry +} + func wantS3Entry(message, source, service string, tags model.Tags) model.LogEntry { entry := model.NewLogEntry() entry.Message = message diff --git a/aws/logs_monitoring_go/internal/handling/scanner.go b/aws/logs_monitoring_go/internal/handling/scanner.go index 60cd249e8..6c906e11c 100644 --- a/aws/logs_monitoring_go/internal/handling/scanner.go +++ b/aws/logs_monitoring_go/internal/handling/scanner.go @@ -9,7 +9,9 @@ import ( "bufio" "bytes" "io" + "iter" "regexp" + "strings" ) const ( @@ -77,3 +79,18 @@ func (s *Scanner) splitOnLines(data []byte, atEOF bool) (advance int, token []by return 0, nil, nil } + +func scan(r io.Reader, re *regexp.Regexp) iter.Seq[s3Record] { + return func(yield func(s3Record) bool) { + scanner := NewScanner(r, re) + for scanner.Scan() { + message := strings.ToValidUTF8(scanner.Text(), "") + if !yield(s3Record{Message: message}) { + return + } + } + if err := scanner.Err(); err != nil { + yield(s3Record{Err: err}) + } + } +} From d71f41cfdb9f9cc83f5c017fc284a7ec7b9ac54f Mon Sep 17 00:00:00 2001 From: Nabil Dakkoune Date: Tue, 5 May 2026 11:16:54 +0200 Subject: [PATCH 2/9] Switch from iter.Seq to iter.Seq2 without error in s3Record struct --- .../internal/handling/cloudtrail.go | 20 +++++++++---------- .../internal/handling/cloudtrail_test.go | 6 +++--- .../internal/handling/s3.go | 9 ++++----- .../internal/handling/scanner.go | 8 ++++---- 4 files changed, 21 insertions(+), 22 deletions(-) diff --git a/aws/logs_monitoring_go/internal/handling/cloudtrail.go b/aws/logs_monitoring_go/internal/handling/cloudtrail.go index d4d221d07..c558e653f 100644 --- a/aws/logs_monitoring_go/internal/handling/cloudtrail.go +++ b/aws/logs_monitoring_go/internal/handling/cloudtrail.go @@ -30,11 +30,11 @@ var ec2InstanceRegexp = sync.OnceValue(func() *regexp.Regexp { return regexp.MustCompile(`^arn:aws:sts::.*?:assumed-role/(?P.*?)/(?Pi-([0-9a-f]{8}|[0-9a-f]{17}))$`) }) -func decodeCloudTrail(r io.Reader) iter.Seq[s3Record] { - return func(yield func(s3Record) bool) { +func decodeCloudTrail(r io.Reader) iter.Seq2[s3Record, error] { + return func(yield func(s3Record, error) bool) { gz, err := gzip.NewReader(r) if err != nil { - yield(s3Record{Err: fmt.Errorf("decode cloudtrail gzip: %w", err)}) + yield(s3Record{}, fmt.Errorf("decode cloudtrail gzip: %w", err)) return } defer gz.Close() //nolint:errcheck @@ -42,33 +42,33 @@ func decodeCloudTrail(r io.Reader) iter.Seq[s3Record] { dec := json.NewDecoder(gz) if t, err := dec.Token(); err != nil || t != json.Delim('{') { - yield(s3Record{Err: errors.New("decode cloudtrail: expected '{' at start of JSON")}) + yield(s3Record{}, errors.New("decode cloudtrail: expected '{' at start of JSON")) return } if t, err := dec.Token(); err != nil || t != "Records" { - yield(s3Record{Err: errors.New("decode cloudtrail: expected 'Records' key")}) + yield(s3Record{}, errors.New("decode cloudtrail: expected 'Records' key")) return } if t, err := dec.Token(); err != nil || t != json.Delim('[') { - yield(s3Record{Err: errors.New("decode cloudtrail: expected '[' at start of Records array")}) + yield(s3Record{}, errors.New("decode cloudtrail: expected '[' at start of Records array")) return } for dec.More() { var record map[string]any if err := dec.Decode(&record); err != nil { - yield(s3Record{Err: fmt.Errorf("decode cloudtrail record: %w", err)}) + yield(s3Record{}, fmt.Errorf("decode cloudtrail record: %w", err)) return } msg, err := json.Marshal(record) if err != nil { - yield(s3Record{Err: fmt.Errorf("marshal cloudtrail record: %w", err)}) + yield(s3Record{}, fmt.Errorf("marshal cloudtrail record: %w", err)) return } host := cloudtrailHost(record) - if !yield(s3Record{Message: string(msg), Host: host}) { + if !yield(s3Record{Message: string(msg), Host: host}, nil) { return } } @@ -98,7 +98,7 @@ func cloudtrailHost(record map[string]any) string { re := ec2InstanceRegexp() matches := re.FindStringSubmatch(arn) if matches == nil { - slog.Debug(arn + " arn did not match, cloudtrail host extraction skipped") + slog.Debug(arn + " arn did not match an EC2 host instance, cloudtrail host extraction skipped") return "" } diff --git a/aws/logs_monitoring_go/internal/handling/cloudtrail_test.go b/aws/logs_monitoring_go/internal/handling/cloudtrail_test.go index b798602c2..9f94895fb 100644 --- a/aws/logs_monitoring_go/internal/handling/cloudtrail_test.go +++ b/aws/logs_monitoring_go/internal/handling/cloudtrail_test.go @@ -198,10 +198,10 @@ func TestDecodeCloudTrail(t *testing.T) { t.Parallel() var got []s3Record - for rec := range decodeCloudTrail(bytes.NewReader(tc.input)) { - if rec.Err != nil { + for rec, err := range decodeCloudTrail(bytes.NewReader(tc.input)) { + if err != nil { if !tc.wantErr { - t.Fatalf("unexpected error: %v", rec.Err) + t.Fatalf("unexpected error: %v", err) } return } diff --git a/aws/logs_monitoring_go/internal/handling/s3.go b/aws/logs_monitoring_go/internal/handling/s3.go index 3d9a92eef..7914be97d 100644 --- a/aws/logs_monitoring_go/internal/handling/s3.go +++ b/aws/logs_monitoring_go/internal/handling/s3.go @@ -31,7 +31,6 @@ const ( type s3Record struct { Message string Host string - Err error } type S3Handler struct { @@ -82,16 +81,16 @@ func (h S3Handler) processRecord(ctx context.Context, client S3APIClient, out ch } }() - var records iter.Seq[s3Record] + var records iter.Seq2[s3Record, error] if cloudTrailRegex().MatchString(key) { records = decodeCloudTrail(body) } else { records = scan(body, h.cfg.S3MultilineLogRegex) } - for rec := range records { - if rec.Err != nil { - return rec.Err + for rec, err := range records { + if err != nil { + return err } entry := h.newS3LogEntry(eventRecord, rec.Message, lambdaOrigin) if h.cfg.Filter.ShouldExclude(entry.Message) { diff --git a/aws/logs_monitoring_go/internal/handling/scanner.go b/aws/logs_monitoring_go/internal/handling/scanner.go index 6c906e11c..4769203ce 100644 --- a/aws/logs_monitoring_go/internal/handling/scanner.go +++ b/aws/logs_monitoring_go/internal/handling/scanner.go @@ -80,17 +80,17 @@ func (s *Scanner) splitOnLines(data []byte, atEOF bool) (advance int, token []by return 0, nil, nil } -func scan(r io.Reader, re *regexp.Regexp) iter.Seq[s3Record] { - return func(yield func(s3Record) bool) { +func scan(r io.Reader, re *regexp.Regexp) iter.Seq2[s3Record, error] { + return func(yield func(s3Record, error) bool) { scanner := NewScanner(r, re) for scanner.Scan() { message := strings.ToValidUTF8(scanner.Text(), "") - if !yield(s3Record{Message: message}) { + if !yield(s3Record{Message: message}, nil) { return } } if err := scanner.Err(); err != nil { - yield(s3Record{Err: err}) + yield(s3Record{}, err) } } } From a9c51067b60e74f8e3ad5411ad8ccd60f18be606 Mon Sep 17 00:00:00 2001 From: Nabil Dakkoune Date: Tue, 5 May 2026 13:16:56 +0200 Subject: [PATCH 3/9] Delete s3Record type and simplify cloudtrail logic --- .../internal/handling/cloudtrail.go | 97 +++++++++++-------- .../internal/handling/cloudtrail_test.go | 94 ++++++------------ .../internal/handling/cloudwatch.go | 4 +- .../internal/handling/s3.go | 19 ++-- .../internal/handling/scanner.go | 8 +- 5 files changed, 104 insertions(+), 118 deletions(-) diff --git a/aws/logs_monitoring_go/internal/handling/cloudtrail.go b/aws/logs_monitoring_go/internal/handling/cloudtrail.go index c558e653f..26bf223e3 100644 --- a/aws/logs_monitoring_go/internal/handling/cloudtrail.go +++ b/aws/logs_monitoring_go/internal/handling/cloudtrail.go @@ -14,6 +14,7 @@ import ( "iter" "log/slog" "regexp" + "strings" "sync" ) @@ -30,11 +31,11 @@ var ec2InstanceRegexp = sync.OnceValue(func() *regexp.Regexp { return regexp.MustCompile(`^arn:aws:sts::.*?:assumed-role/(?P.*?)/(?Pi-([0-9a-f]{8}|[0-9a-f]{17}))$`) }) -func decodeCloudTrail(r io.Reader) iter.Seq2[s3Record, error] { - return func(yield func(s3Record, error) bool) { +func decodeCloudTrail(r io.Reader) iter.Seq2[string, error] { + return func(yield func(string, error) bool) { gz, err := gzip.NewReader(r) if err != nil { - yield(s3Record{}, fmt.Errorf("decode cloudtrail gzip: %w", err)) + yield("", fmt.Errorf("decode cloudtrail gzip: %w", err)) return } defer gz.Close() //nolint:errcheck @@ -42,65 +43,81 @@ func decodeCloudTrail(r io.Reader) iter.Seq2[s3Record, error] { dec := json.NewDecoder(gz) if t, err := dec.Token(); err != nil || t != json.Delim('{') { - yield(s3Record{}, errors.New("decode cloudtrail: expected '{' at start of JSON")) + yield("", errors.New("decode cloudtrail: expected '{' at start of JSON")) return } if t, err := dec.Token(); err != nil || t != "Records" { - yield(s3Record{}, errors.New("decode cloudtrail: expected 'Records' key")) + yield("", errors.New("decode cloudtrail: expected 'Records' key")) return } if t, err := dec.Token(); err != nil || t != json.Delim('[') { - yield(s3Record{}, errors.New("decode cloudtrail: expected '[' at start of Records array")) + yield("", errors.New("decode cloudtrail: expected '[' at start of Records array")) return } for dec.More() { - var record map[string]any - if err := dec.Decode(&record); err != nil { - yield(s3Record{}, fmt.Errorf("decode cloudtrail record: %w", err)) + var raw json.RawMessage + if err := dec.Decode(&raw); err != nil { + yield("", fmt.Errorf("decode cloudtrail record: %w", err)) return } - - msg, err := json.Marshal(record) - if err != nil { - yield(s3Record{}, fmt.Errorf("marshal cloudtrail record: %w", err)) - return - } - - host := cloudtrailHost(record) - if !yield(s3Record{Message: string(msg), Host: host}, nil) { + if !yield(string(raw), nil) { return } } } } -func cloudtrailHostFromMessage(message string) string { - var record map[string]any - if err := json.Unmarshal([]byte(message), &record); err != nil { +func cloudtrailHost(message string) string { + dec := json.NewDecoder(strings.NewReader(message)) + if t, err := dec.Token(); err != nil || t != json.Delim('{') { return "" } - return cloudtrailHost(record) -} -func cloudtrailHost(record map[string]any) string { - ui, ok := record[cloudTrailUserIdentityKey].(map[string]any) - if !ok { - slog.Debug(cloudTrailUserIdentityKey + " key not found, cloudtrail host extraction skipped") - return "" - } - arn, ok := ui[cloudTrailARNKey].(string) - if !ok { - slog.Debug(cloudTrailARNKey + " key not found, cloudtrail host extraction skipped") - return "" - } + for dec.More() { + key, err := dec.Token() + if err != nil { + return "" + } + if key != cloudTrailUserIdentityKey { + var skip json.RawMessage + if err := dec.Decode(&skip); err != nil { + return "" + } + continue + } - re := ec2InstanceRegexp() - matches := re.FindStringSubmatch(arn) - if matches == nil { - slog.Debug(arn + " arn did not match an EC2 host instance, cloudtrail host extraction skipped") + if t, err := dec.Token(); err != nil || t != json.Delim('{') { + return "" + } + + for dec.More() { + innerKey, err := dec.Token() + if err != nil { + return "" + } + if innerKey != cloudTrailARNKey { + var skip json.RawMessage + if err := dec.Decode(&skip); err != nil { + return "" + } + continue + } + + var arn string + if err := dec.Decode(&arn); err != nil { + return "" + } + + re := ec2InstanceRegexp() + matches := re.FindStringSubmatch(arn) + if matches == nil { + slog.Debug(arn + " arn did not match an EC2 host instance, cloudtrail host extraction skipped") + return "" + } + return matches[re.SubexpIndex("host")] + } return "" } - - return matches[re.SubexpIndex("host")] + return "" } diff --git a/aws/logs_monitoring_go/internal/handling/cloudtrail_test.go b/aws/logs_monitoring_go/internal/handling/cloudtrail_test.go index 9f94895fb..27ebdf026 100644 --- a/aws/logs_monitoring_go/internal/handling/cloudtrail_test.go +++ b/aws/logs_monitoring_go/internal/handling/cloudtrail_test.go @@ -69,53 +69,44 @@ func TestCloudtrailHost(t *testing.T) { t.Parallel() tests := map[string]struct { - record map[string]any - want string + message string + want string }{ "ec2 instance (17)": { - record: map[string]any{ - "userIdentity": map[string]any{ - "arn": "arn:aws:sts::601427279990:assumed-role/MyRole/i-08014e4f62ccf762d", - }, - }, - want: "i-08014e4f62ccf762d", + message: `{"userIdentity":{"arn":"arn:aws:sts::601427279990:assumed-role/MyRole/i-08014e4f62ccf762d"}}`, + want: "i-08014e4f62ccf762d", }, "ec2 instance (8)": { - record: map[string]any{ - "userIdentity": map[string]any{ - "arn": "arn:aws:sts::601427279990:assumed-role/MyRole/i-abcd1234", - }, - }, - want: "i-abcd1234", + message: `{"userIdentity":{"arn":"arn:aws:sts::601427279990:assumed-role/MyRole/i-abcd1234"}}`, + want: "i-abcd1234", }, "non ec2 arn": { - record: map[string]any{ - "userIdentity": map[string]any{ - "arn": "arn:aws:sts::601427279990:assumed-role/MyRole/my-session", - }, - }, - want: "", + message: `{"userIdentity":{"arn":"arn:aws:sts::601427279990:assumed-role/MyRole/my-session"}}`, + want: "", }, "missing userIdentity": { - record: map[string]any{"eventName": "DescribeTable"}, - want: "", + message: `{"eventName":"DescribeTable"}`, + want: "", }, "missing arn": { - record: map[string]any{ - "userIdentity": map[string]any{ - "type": "AssumedRole", - }, - }, - want: "", + message: `{"userIdentity":{"type":"AssumedRole"}}`, + want: "", + }, + "invalid json": { + message: "not json", + want: "", + }, + "empty message": { + message: "", + want: "", }, } for name, tc := range tests { t.Run(name, func(t *testing.T) { t.Parallel() - host := cloudtrailHost(tc.record) - if host != tc.want { - t.Errorf("want %q, got %q", tc.want, host) + if got := cloudtrailHost(tc.message); got != tc.want { + t.Errorf("want %q, got %q", tc.want, got) } }) } @@ -126,10 +117,10 @@ func TestDecodeCloudTrail(t *testing.T) { tests := map[string]struct { input []byte - want []s3Record + want []string wantErr bool }{ - "single record with ec2 host": { + "single record": { input: testutil.MustGzipJSON(t, map[string]any{ "Records": []any{ map[string]any{ @@ -140,29 +131,8 @@ func TestDecodeCloudTrail(t *testing.T) { }, }, }), - want: []s3Record{ - { - Message: `{"eventName":"DescribeTable","userIdentity":{"arn":"arn:aws:sts::601427279990:assumed-role/MyRole/i-08014e4f62ccf762d"}}`, - Host: "i-08014e4f62ccf762d", - }, - }, - }, - "single record without ec2 host": { - input: testutil.MustGzipJSON(t, map[string]any{ - "Records": []any{ - map[string]any{ - "eventName": "DescribeTable", - "userIdentity": map[string]any{ - "arn": "arn:aws:iam::601427279990:user/admin", - }, - }, - }, - }), - want: []s3Record{ - { - Message: `{"eventName":"DescribeTable","userIdentity":{"arn":"arn:aws:iam::601427279990:user/admin"}}`, - Host: "", - }, + want: []string{ + `{"eventName":"DescribeTable","userIdentity":{"arn":"arn:aws:sts::601427279990:assumed-role/MyRole/i-08014e4f62ccf762d"}}`, }, }, "multiple records": { @@ -172,9 +142,9 @@ func TestDecodeCloudTrail(t *testing.T) { map[string]any{"eventName": "event2"}, }, }), - want: []s3Record{ - {Message: `{"eventName":"event1"}`}, - {Message: `{"eventName":"event2"}`}, + want: []string{ + `{"eventName":"event1"}`, + `{"eventName":"event2"}`, }, }, "empty records array": { @@ -197,15 +167,15 @@ func TestDecodeCloudTrail(t *testing.T) { t.Run(name, func(t *testing.T) { t.Parallel() - var got []s3Record - for rec, err := range decodeCloudTrail(bytes.NewReader(tc.input)) { + var got []string + for msg, err := range decodeCloudTrail(bytes.NewReader(tc.input)) { if err != nil { if !tc.wantErr { t.Fatalf("unexpected error: %v", err) } return } - got = append(got, rec) + got = append(got, msg) } if tc.wantErr { diff --git a/aws/logs_monitoring_go/internal/handling/cloudwatch.go b/aws/logs_monitoring_go/internal/handling/cloudwatch.go index 65fb9c6bd..4e5dafd8a 100644 --- a/aws/logs_monitoring_go/internal/handling/cloudwatch.go +++ b/aws/logs_monitoring_go/internal/handling/cloudwatch.go @@ -135,8 +135,8 @@ func (h CloudwatchHandler) newCloudwatchLogEntry(event events.CloudwatchLogsLogE entry.ID = event.ID entry.Timestamp = event.Timestamp - if entry.Source == sourceCloudtrail { - if host := cloudtrailHostFromMessage(event.Message); host != "" { + if h.cfg.Host == "" && entry.Source == sourceCloudtrail { + if host := cloudtrailHost(event.Message); host != "" { entry.Host = host } } diff --git a/aws/logs_monitoring_go/internal/handling/s3.go b/aws/logs_monitoring_go/internal/handling/s3.go index 7914be97d..4c44570d8 100644 --- a/aws/logs_monitoring_go/internal/handling/s3.go +++ b/aws/logs_monitoring_go/internal/handling/s3.go @@ -28,11 +28,6 @@ const ( s3KeyCloudtrail = "_CloudTrail_" ) -type s3Record struct { - Message string - Host string -} - type S3Handler struct { cfg *config.Config } @@ -81,23 +76,27 @@ func (h S3Handler) processRecord(ctx context.Context, client S3APIClient, out ch } }() - var records iter.Seq2[s3Record, error] - if cloudTrailRegex().MatchString(key) { + var records iter.Seq2[string, error] + isCloudTrail := cloudTrailRegex().MatchString(key) + if isCloudTrail { records = decodeCloudTrail(body) } else { records = scan(body, h.cfg.S3MultilineLogRegex) } - for rec, err := range records { + for message, err := range records { if err != nil { return err } - entry := h.newS3LogEntry(eventRecord, rec.Message, lambdaOrigin) + entry := h.newS3LogEntry(eventRecord, message, lambdaOrigin) if h.cfg.Filter.ShouldExclude(entry.Message) { continue } - entry.Host = cmp.Or(h.cfg.Host, rec.Host) + if isCloudTrail { + entry.Host = cloudtrailHost(message) + } + entry.Host = cmp.Or(h.cfg.Host, entry.Host) entry.Message = h.cfg.Scrubber.Scrub(entry.Message) if err := concurrent.SafeSender(ctx, out, entry); err != nil { return err diff --git a/aws/logs_monitoring_go/internal/handling/scanner.go b/aws/logs_monitoring_go/internal/handling/scanner.go index 4769203ce..ab5b63daa 100644 --- a/aws/logs_monitoring_go/internal/handling/scanner.go +++ b/aws/logs_monitoring_go/internal/handling/scanner.go @@ -80,17 +80,17 @@ func (s *Scanner) splitOnLines(data []byte, atEOF bool) (advance int, token []by return 0, nil, nil } -func scan(r io.Reader, re *regexp.Regexp) iter.Seq2[s3Record, error] { - return func(yield func(s3Record, error) bool) { +func scan(r io.Reader, re *regexp.Regexp) iter.Seq2[string, error] { + return func(yield func(string, error) bool) { scanner := NewScanner(r, re) for scanner.Scan() { message := strings.ToValidUTF8(scanner.Text(), "") - if !yield(s3Record{Message: message}, nil) { + if !yield(message, nil) { return } } if err := scanner.Err(); err != nil { - yield(s3Record{}, err) + yield("", err) } } } From ff0fafde3a129a5c3616c3968265cb24ed6f0a24 Mon Sep 17 00:00:00 2001 From: Nabil Dakkoune Date: Tue, 5 May 2026 13:19:41 +0200 Subject: [PATCH 4/9] formatting --- .../internal/handling/s3_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/aws/logs_monitoring_go/internal/handling/s3_test.go b/aws/logs_monitoring_go/internal/handling/s3_test.go index 0726d569f..9034380c1 100644 --- a/aws/logs_monitoring_go/internal/handling/s3_test.go +++ b/aws/logs_monitoring_go/internal/handling/s3_test.go @@ -30,7 +30,7 @@ var ( }, } - testCloudTrailKey = "601427279990_CloudTrail_us-east-1_20210503T0000Z_QrttGEk4ZcBTLwj5.json.gz" + testCloudTrailKey = "601427279990_CloudTrail_us-east-1_20210503T0000Z_QrttGEk4ZcBTLwj5.json.gz" testCloudTrailEventRecord = events.S3EventRecord{ S3: events.S3Entity{ Bucket: events.S3Bucket{Name: "trail-bucket"}, @@ -43,11 +43,11 @@ func TestProcessS3Record(t *testing.T) { t.Parallel() tests := map[string]struct { - mockSetup func(m *MockS3APIClient) - cfg *config.Config + mockSetup func(m *MockS3APIClient) + cfg *config.Config eventRecord events.S3EventRecord - want []model.LogEntry - wantErr bool + want []model.LogEntry + wantErr bool }{ "single line": { mockSetup: func(m *MockS3APIClient) { @@ -170,7 +170,7 @@ func TestProcessS3Record(t *testing.T) { Body: io.NopCloser(bytes.NewReader(data)), }, nil) }, - cfg: testutil.EmptyConfig(), + cfg: testutil.EmptyConfig(), eventRecord: testCloudTrailEventRecord, want: []model.LogEntry{ wantCloudTrailEntry( @@ -196,7 +196,7 @@ func TestProcessS3Record(t *testing.T) { Body: io.NopCloser(bytes.NewReader(data)), }, nil) }, - cfg: testutil.EmptyConfig(), + cfg: testutil.EmptyConfig(), eventRecord: testCloudTrailEventRecord, want: []model.LogEntry{ wantCloudTrailEntry( From 8ac49e19ce9405287a1938bb2fcff8e97c96e35f Mon Sep 17 00:00:00 2001 From: Nabil Dakkoune Date: Tue, 5 May 2026 13:37:40 +0200 Subject: [PATCH 5/9] Fix: put host only for CloudTrail with correct priority --- aws/logs_monitoring_go/internal/handling/s3.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/aws/logs_monitoring_go/internal/handling/s3.go b/aws/logs_monitoring_go/internal/handling/s3.go index 4c44570d8..c3eca4556 100644 --- a/aws/logs_monitoring_go/internal/handling/s3.go +++ b/aws/logs_monitoring_go/internal/handling/s3.go @@ -94,9 +94,8 @@ func (h S3Handler) processRecord(ctx context.Context, client S3APIClient, out ch } if isCloudTrail { - entry.Host = cloudtrailHost(message) + entry.Host = cmp.Or(h.cfg.Host, cloudtrailHost(message)) } - entry.Host = cmp.Or(h.cfg.Host, entry.Host) entry.Message = h.cfg.Scrubber.Scrub(entry.Message) if err := concurrent.SafeSender(ctx, out, entry); err != nil { return err From 70cf34880f105d425722760ce7a31b38b15a96cf Mon Sep 17 00:00:00 2001 From: Nabil Dakkoune Date: Tue, 5 May 2026 17:04:51 +0200 Subject: [PATCH 6/9] Update aws/logs_monitoring_go/internal/handling/cloudtrail.go Co-authored-by: Vincent Boutour --- aws/logs_monitoring_go/internal/handling/cloudtrail.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/aws/logs_monitoring_go/internal/handling/cloudtrail.go b/aws/logs_monitoring_go/internal/handling/cloudtrail.go index 26bf223e3..91d1b4e34 100644 --- a/aws/logs_monitoring_go/internal/handling/cloudtrail.go +++ b/aws/logs_monitoring_go/internal/handling/cloudtrail.go @@ -23,9 +23,7 @@ const ( cloudTrailUserIdentityKey = "userIdentity" ) -var cloudTrailRegex = sync.OnceValue(func() *regexp.Regexp { - return regexp.MustCompile(`\d+_CloudTrail(|-Digest|-Insight)_\w{2}(|-gov|-cn)-\w{4,9}-\d_(|.+)\d{8}T\d{4,6}Z(|.+)\.json\.gz$`) -}) +var cloudTrailRegex = regexp.MustCompile(`\d+_CloudTrail(|-Digest|-Insight)_\w{2}(|-gov|-cn)-\w{4,9}-\d_(|.+)\d{8}T\d{4,6}Z(|.+)\.json\.gz$`) var ec2InstanceRegexp = sync.OnceValue(func() *regexp.Regexp { return regexp.MustCompile(`^arn:aws:sts::.*?:assumed-role/(?P.*?)/(?Pi-([0-9a-f]{8}|[0-9a-f]{17}))$`) From bbac37c5e8e1af1b596a59b697aa1c1ca982f01b Mon Sep 17 00:00:00 2001 From: Nabil Dakkoune Date: Tue, 5 May 2026 17:23:37 +0200 Subject: [PATCH 7/9] Delete lazyloading of cloudtrail-related regexes --- .../internal/handling/cloudtrail.go | 15 ++++++--------- .../internal/handling/cloudtrail_test.go | 2 +- aws/logs_monitoring_go/internal/handling/s3.go | 2 +- 3 files changed, 8 insertions(+), 11 deletions(-) diff --git a/aws/logs_monitoring_go/internal/handling/cloudtrail.go b/aws/logs_monitoring_go/internal/handling/cloudtrail.go index 91d1b4e34..9e328eb24 100644 --- a/aws/logs_monitoring_go/internal/handling/cloudtrail.go +++ b/aws/logs_monitoring_go/internal/handling/cloudtrail.go @@ -15,7 +15,6 @@ import ( "log/slog" "regexp" "strings" - "sync" ) const ( @@ -23,11 +22,10 @@ const ( cloudTrailUserIdentityKey = "userIdentity" ) -var cloudTrailRegex = regexp.MustCompile(`\d+_CloudTrail(|-Digest|-Insight)_\w{2}(|-gov|-cn)-\w{4,9}-\d_(|.+)\d{8}T\d{4,6}Z(|.+)\.json\.gz$`) - -var ec2InstanceRegexp = sync.OnceValue(func() *regexp.Regexp { - return regexp.MustCompile(`^arn:aws:sts::.*?:assumed-role/(?P.*?)/(?Pi-([0-9a-f]{8}|[0-9a-f]{17}))$`) -}) +var ( + cloudTrailRegex = regexp.MustCompile(`\d+_CloudTrail(|-Digest|-Insight)_\w{2}(|-gov|-cn)-\w{4,9}-\d_(|.+)\d{8}T\d{4,6}Z(|.+)\.json\.gz$`) + ec2InstanceRegexp = regexp.MustCompile(`^arn:aws:sts::.*?:assumed-role/(?P.*?)/(?Pi-([0-9a-f]{8}|[0-9a-f]{17}))$`) +) func decodeCloudTrail(r io.Reader) iter.Seq2[string, error] { return func(yield func(string, error) bool) { @@ -107,13 +105,12 @@ func cloudtrailHost(message string) string { return "" } - re := ec2InstanceRegexp() - matches := re.FindStringSubmatch(arn) + matches := ec2InstanceRegexp.FindStringSubmatch(arn) if matches == nil { slog.Debug(arn + " arn did not match an EC2 host instance, cloudtrail host extraction skipped") return "" } - return matches[re.SubexpIndex("host")] + return matches[ec2InstanceRegexp.SubexpIndex("host")] } return "" } diff --git a/aws/logs_monitoring_go/internal/handling/cloudtrail_test.go b/aws/logs_monitoring_go/internal/handling/cloudtrail_test.go index 27ebdf026..b9ea593cd 100644 --- a/aws/logs_monitoring_go/internal/handling/cloudtrail_test.go +++ b/aws/logs_monitoring_go/internal/handling/cloudtrail_test.go @@ -57,7 +57,7 @@ func TestCloudTrailRegex(t *testing.T) { for name, tc := range tests { t.Run(name, func(t *testing.T) { t.Parallel() - got := cloudTrailRegex().MatchString(tc.key) + got := cloudTrailRegex.MatchString(tc.key) if got != tc.want { t.Errorf("cloudTrailRegex().MatchString(%q) = %v, want %v", tc.key, got, tc.want) } diff --git a/aws/logs_monitoring_go/internal/handling/s3.go b/aws/logs_monitoring_go/internal/handling/s3.go index c3eca4556..cafae6d0d 100644 --- a/aws/logs_monitoring_go/internal/handling/s3.go +++ b/aws/logs_monitoring_go/internal/handling/s3.go @@ -77,7 +77,7 @@ func (h S3Handler) processRecord(ctx context.Context, client S3APIClient, out ch }() var records iter.Seq2[string, error] - isCloudTrail := cloudTrailRegex().MatchString(key) + isCloudTrail := cloudTrailRegex.MatchString(key) if isCloudTrail { records = decodeCloudTrail(body) } else { From a1475a56556620664b8314fd26685d9467337382 Mon Sep 17 00:00:00 2001 From: Nabil Dakkoune Date: Wed, 6 May 2026 11:43:07 +0200 Subject: [PATCH 8/9] keep old Python behavior on cloudtrail host --- aws/logs_monitoring_go/internal/handling/cloudwatch.go | 6 ++---- aws/logs_monitoring_go/internal/handling/cloudwatch_test.go | 2 +- aws/logs_monitoring_go/internal/handling/s3.go | 2 +- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/aws/logs_monitoring_go/internal/handling/cloudwatch.go b/aws/logs_monitoring_go/internal/handling/cloudwatch.go index 4e5dafd8a..25249f419 100644 --- a/aws/logs_monitoring_go/internal/handling/cloudwatch.go +++ b/aws/logs_monitoring_go/internal/handling/cloudwatch.go @@ -135,10 +135,8 @@ func (h CloudwatchHandler) newCloudwatchLogEntry(event events.CloudwatchLogsLogE entry.ID = event.ID entry.Timestamp = event.Timestamp - if h.cfg.Host == "" && entry.Source == sourceCloudtrail { - if host := cloudtrailHost(event.Message); host != "" { - entry.Host = host - } + if entry.Source == sourceCloudtrail { + entry.Host = cloudtrailHost(event.Message) } return entry diff --git a/aws/logs_monitoring_go/internal/handling/cloudwatch_test.go b/aws/logs_monitoring_go/internal/handling/cloudwatch_test.go index 65f96a249..6caf31faf 100644 --- a/aws/logs_monitoring_go/internal/handling/cloudwatch_test.go +++ b/aws/logs_monitoring_go/internal/handling/cloudwatch_test.go @@ -178,7 +178,7 @@ func TestCloudwatchHandler_Handle(t *testing.T) { SourceCategory: "aws", Service: "cloudtrail", Tags: model.Tags{"service:cloudtrail"}, - Host: "cloudtrail-logs", + Host: "", ID: "ct2", Timestamp: 1620000000000, Metadata: model.CloudwatchMetadata{ diff --git a/aws/logs_monitoring_go/internal/handling/s3.go b/aws/logs_monitoring_go/internal/handling/s3.go index cafae6d0d..38b6ad9a8 100644 --- a/aws/logs_monitoring_go/internal/handling/s3.go +++ b/aws/logs_monitoring_go/internal/handling/s3.go @@ -94,7 +94,7 @@ func (h S3Handler) processRecord(ctx context.Context, client S3APIClient, out ch } if isCloudTrail { - entry.Host = cmp.Or(h.cfg.Host, cloudtrailHost(message)) + entry.Host = cloudtrailHost(message) } entry.Message = h.cfg.Scrubber.Scrub(entry.Message) if err := concurrent.SafeSender(ctx, out, entry); err != nil { From a309618648786fc0e822811504d6a9ed2ac44a35 Mon Sep 17 00:00:00 2001 From: Nabil Dakkoune Date: Wed, 6 May 2026 14:23:58 +0200 Subject: [PATCH 9/9] feat(go-forwarder): add EventBridge --- aws/logs_monitoring_go/cmd/forwarder/main.go | 21 +-- .../internal/forwarding/storage.go | 8 +- .../internal/handling/cloudwatch.go | 2 +- .../internal/handling/eventbridge.go | 94 ++++++++++++ .../internal/handling/eventbridge_test.go | 132 +++++++++++++++++ .../internal/handling/handler.go | 19 ++- .../internal/handling/kinesis.go | 4 +- .../internal/handling/s3.go | 2 +- .../internal/parsing/content.go | 24 ++++ .../internal/parsing/content_string.go | 28 ++++ .../internal/parsing/eventbridge.go | 136 ++++++++++++++++++ .../internal/parsing/invocation_source.go | 89 ------------ .../parsing/invocation_source_bench_test.go | 67 --------- .../parsing/invocation_source_string.go | 29 ---- .../parsing/invocation_source_test.go | 52 ------- .../internal/parsing/parse.go | 92 ++++++++++++ .../internal/parsing/parse_test.go | 94 ++++++++++++ .../internal/pipeline/pipeline.go | 34 +++-- .../internal/pipeline/run.go | 25 ---- 19 files changed, 653 insertions(+), 299 deletions(-) create mode 100644 aws/logs_monitoring_go/internal/handling/eventbridge.go create mode 100644 aws/logs_monitoring_go/internal/handling/eventbridge_test.go create mode 100644 aws/logs_monitoring_go/internal/parsing/content.go create mode 100644 aws/logs_monitoring_go/internal/parsing/content_string.go create mode 100644 aws/logs_monitoring_go/internal/parsing/eventbridge.go delete mode 100644 aws/logs_monitoring_go/internal/parsing/invocation_source.go delete mode 100644 aws/logs_monitoring_go/internal/parsing/invocation_source_bench_test.go delete mode 100644 aws/logs_monitoring_go/internal/parsing/invocation_source_string.go delete mode 100644 aws/logs_monitoring_go/internal/parsing/invocation_source_test.go create mode 100644 aws/logs_monitoring_go/internal/parsing/parse.go create mode 100644 aws/logs_monitoring_go/internal/parsing/parse_test.go delete mode 100644 aws/logs_monitoring_go/internal/pipeline/run.go diff --git a/aws/logs_monitoring_go/cmd/forwarder/main.go b/aws/logs_monitoring_go/cmd/forwarder/main.go index d31ad219a..46ba69d2e 100644 --- a/aws/logs_monitoring_go/cmd/forwarder/main.go +++ b/aws/logs_monitoring_go/cmd/forwarder/main.go @@ -8,11 +8,9 @@ package main import ( "context" "encoding/json" - "errors" + "fmt" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/forwarding" - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/handling" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/pipeline" @@ -33,24 +31,15 @@ func main() { panic(err) } - cwHandler := handling.NewCloudwatch(cfg) - kinesisHandler := handling.NewKinesis(cfg) - s3Handler := handling.NewS3(cfg) - handling.Register(parsing.InvocationSourceCloudwatchLogs, cwHandler) - handling.Register(parsing.InvocationSourceKinesis, kinesisHandler) - handling.Register(parsing.InvocationSourceS3, s3Handler) - lambda.Start(handleRequest(cfg)) } func handleRequest(cfg *config.Config) func(ctx context.Context, event json.RawMessage) error { return func(ctx context.Context, event json.RawMessage) error { - invocation := parsing.DetectInvocationSource(event) - if invocation == parsing.InvocationSourceUnknown { - return errors.New("unknown invocation") + parsed, err := parsing.Parse(event) + if err != nil { + return fmt.Errorf("parse: %w", err) } - - run := pipeline.NewRun(cfg, handling.Handlers[invocation], forwarding.Storage(invocation)) - return pipeline.Start(ctx, event, run) + return pipeline.Start(ctx, parsed, cfg) } } diff --git a/aws/logs_monitoring_go/internal/forwarding/storage.go b/aws/logs_monitoring_go/internal/forwarding/storage.go index 292ac75e9..53b1bd03b 100644 --- a/aws/logs_monitoring_go/internal/forwarding/storage.go +++ b/aws/logs_monitoring_go/internal/forwarding/storage.go @@ -12,11 +12,11 @@ const ( s3Storage = "s3" ) -func Storage(source parsing.InvocationSource) string { - switch source { - case parsing.InvocationSourceS3: +func StorageFromContentType(contentType parsing.ContentType) string { + switch contentType { + case parsing.ContentTypeS3: return s3Storage - case parsing.InvocationSourceCloudwatchLogs, parsing.InvocationSourceKinesis: + case parsing.ContentTypeCloudwatchLogs, parsing.ContentTypeKinesis: return cloudwatchStorage default: return "" diff --git a/aws/logs_monitoring_go/internal/handling/cloudwatch.go b/aws/logs_monitoring_go/internal/handling/cloudwatch.go index 25249f419..a8cc4a751 100644 --- a/aws/logs_monitoring_go/internal/handling/cloudwatch.go +++ b/aws/logs_monitoring_go/internal/handling/cloudwatch.go @@ -42,7 +42,7 @@ func NewCloudwatch(cfg *config.Config) *CloudwatchHandler { } } -func (h CloudwatchHandler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error { +func (h *CloudwatchHandler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error { var cwEvent events.CloudwatchLogsEvent if err := json.Unmarshal(event, &cwEvent); err != nil { return fmt.Errorf("unmarshal: %w", err) diff --git a/aws/logs_monitoring_go/internal/handling/eventbridge.go b/aws/logs_monitoring_go/internal/handling/eventbridge.go new file mode 100644 index 000000000..06ab7619e --- /dev/null +++ b/aws/logs_monitoring_go/internal/handling/eventbridge.go @@ -0,0 +1,94 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package handling + +import ( + "bytes" + "cmp" + "context" + "encoding/json" + "errors" + "fmt" + "strings" + + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" +) + +type EventBridgeHandler struct { + cfg *config.Config +} + +func NewEventBridge(cfg *config.Config) *EventBridgeHandler { + return &EventBridgeHandler{ + cfg: cfg, + } +} + +func (h *EventBridgeHandler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error { + lambdaOrigin, err := model.GetLambdaOrigin(ctx) + if err != nil { + return fmt.Errorf("get lambda origin: %w", err) + } + + ebSource, err := decodeEventBridgeSource(event) + if err != nil { + return err + } + source := cmp.Or(h.cfg.Source, ebSource) + service := cmp.Or(h.cfg.Service, source) + + entry := model.NewLogEntry() + entry.Message = string(event) + entry.Source = source + entry.Service = service + entry.Tags = h.cfg.Tags + entry.Metadata = lambdaOrigin + + if h.cfg.Filter.ShouldExclude(entry.Message) { + return nil + } + + entry.Message = h.cfg.Scrubber.Scrub(entry.Message) + return concurrent.SafeSender(ctx, out, entry) +} + +func decodeEventBridgeSource(event json.RawMessage) (string, error) { + dec := json.NewDecoder(bytes.NewReader(event)) + + if t, err := dec.Token(); err != nil || t != json.Delim('{') { + return "", errors.New("decode eventbridge source: expected '{'") + } + + for dec.More() { + key, err := dec.Token() + if err != nil { + return "", fmt.Errorf("decode eventbridge source: read key: %w", err) + } + if key == "source" { + var source string + if err := dec.Decode(&source); err != nil { + return "", fmt.Errorf("decode eventbridge source: %w", err) + } + return eventBridgeSource(source), nil + } + var skip json.RawMessage + if err := dec.Decode(&skip); err != nil { + return "", fmt.Errorf("decode eventbridge source: skip field: %w", err) + } + } + + return "", nil +} + +func eventBridgeSource(source string) string { + _, after, found := strings.Cut(source, ".") + if found { + return after + } + return sourceCloudwatch +} diff --git a/aws/logs_monitoring_go/internal/handling/eventbridge_test.go b/aws/logs_monitoring_go/internal/handling/eventbridge_test.go new file mode 100644 index 000000000..422e94e13 --- /dev/null +++ b/aws/logs_monitoring_go/internal/handling/eventbridge_test.go @@ -0,0 +1,132 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package handling + +import ( + "encoding/json" + "testing" + + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/testutil" + "github.com/google/go-cmp/cmp" +) + +func TestEventBridgeHandler_Handle(t *testing.T) { + t.Parallel() + + ctx := testutil.LambdaContext(t) + + tests := map[string]struct { + event json.RawMessage + cfg *config.Config + want []model.LogEntry + wantErr bool + }{ + "scheduled event": { + event: json.RawMessage(`{"version":"0","id":"abc","detail-type":"Scheduled Event","source":"aws.events","account":"123456789012","time":"1970-01-01T00:00:00Z","region":"us-east-1","resources":[],"detail":{}}`), + cfg: testutil.EmptyConfig(), + want: []model.LogEntry{ + { + Message: `{"version":"0","id":"abc","detail-type":"Scheduled Event","source":"aws.events","account":"123456789012","time":"1970-01-01T00:00:00Z","region":"us-east-1","resources":[],"detail":{}}`, + Source: "events", + SourceCategory: "aws", + Service: "events", + Metadata: testutil.LambdaOrigin(), + }, + }, + }, + "ec2 event": { + event: json.RawMessage(`{"version":"0","id":"abc","detail-type":"EC2 Instance State-change Notification","source":"aws.ec2","account":"123456789012","time":"1970-01-01T00:00:00Z","region":"us-east-1","resources":[],"detail":{"instance-id":"i-123","state":"running"}}`), + cfg: testutil.EmptyConfig(), + want: []model.LogEntry{ + { + Message: `{"version":"0","id":"abc","detail-type":"EC2 Instance State-change Notification","source":"aws.ec2","account":"123456789012","time":"1970-01-01T00:00:00Z","region":"us-east-1","resources":[],"detail":{"instance-id":"i-123","state":"running"}}`, + Source: "ec2", + SourceCategory: "aws", + Service: "ec2", + Metadata: testutil.LambdaOrigin(), + }, + }, + }, + "custom source override": { + event: json.RawMessage(`{"version":"0","id":"abc","detail-type":"Scheduled Event","source":"aws.events","account":"123456789012","time":"1970-01-01T00:00:00Z","region":"us-east-1","resources":[],"detail":{}}`), + cfg: &config.Config{Source: "custom-source"}, + want: []model.LogEntry{ + { + Message: `{"version":"0","id":"abc","detail-type":"Scheduled Event","source":"aws.events","account":"123456789012","time":"1970-01-01T00:00:00Z","region":"us-east-1","resources":[],"detail":{}}`, + Source: "custom-source", + SourceCategory: "aws", + Service: "custom-source", + Metadata: testutil.LambdaOrigin(), + }, + }, + }, + "invalid JSON": { + event: json.RawMessage(`not json`), + cfg: testutil.EmptyConfig(), + wantErr: true, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + t.Parallel() + + handler := NewEventBridge(tc.cfg) + out := make(chan model.LogEntry, len(tc.want)) + + err := handler.Handle(ctx, tc.event, out) + close(out) + + if tc.wantErr { + if err == nil { + t.Fatal("expected error, got nil") + } + return + } + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + var got []model.LogEntry + for entry := range out { + got = append(got, entry) + } + + if diff := cmp.Diff(tc.want, got); diff != "" { + t.Errorf("entries mismatch (-want +got):\n%s", diff) + } + }) + } +} + +func TestEventBridgeSource(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + source string + want string + }{ + "aws.events": {source: "aws.events", want: "events"}, + "aws.ec2": {source: "aws.ec2", want: "ec2"}, + "aws.s3": {source: "aws.s3", want: "s3"}, + "custom.app": {source: "custom.app", want: "app"}, + "no dot": {source: "nodot", want: "cloudwatch"}, + "empty string": {source: "", want: "cloudwatch"}, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + t.Parallel() + got := eventBridgeSource(tc.source) + if got != tc.want { + t.Errorf("got %q, want %q", got, tc.want) + } + }) + } +} diff --git a/aws/logs_monitoring_go/internal/handling/handler.go b/aws/logs_monitoring_go/internal/handling/handler.go index 6edab85a3..49d8b204d 100644 --- a/aws/logs_monitoring_go/internal/handling/handler.go +++ b/aws/logs_monitoring_go/internal/handling/handler.go @@ -8,17 +8,28 @@ package handling import ( "context" "encoding/json" + "fmt" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing" ) -var Handlers = make(map[parsing.InvocationSource]Handler) - type Handler interface { Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error } -func Register(invocation parsing.InvocationSource, handler Handler) { - Handlers[invocation] = handler +func NewHandler(ct parsing.ContentType, cfg *config.Config) (Handler, error) { + switch ct { + case parsing.ContentTypeCloudwatchLogs: + return NewCloudwatch(cfg), nil + case parsing.ContentTypeS3: + return NewS3(cfg), nil + case parsing.ContentTypeKinesis: + return NewKinesis(cfg), nil + case parsing.ContentTypeEventBridge: + return NewEventBridge(cfg), nil + default: + return nil, fmt.Errorf("unsupported content type: %v", ct) + } } diff --git a/aws/logs_monitoring_go/internal/handling/kinesis.go b/aws/logs_monitoring_go/internal/handling/kinesis.go index a1caf3426..4ce403f58 100644 --- a/aws/logs_monitoring_go/internal/handling/kinesis.go +++ b/aws/logs_monitoring_go/internal/handling/kinesis.go @@ -26,13 +26,13 @@ func NewKinesis(cfg *config.Config) *KinesisHandler { } } -func (h KinesisHandler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error { +func (h *KinesisHandler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error { var kinesisEvent events.KinesisEvent if err := json.Unmarshal(event, &kinesisEvent); err != nil { return fmt.Errorf("unmarshal: %w", err) } - cw := CloudwatchHandler(h) + cw := CloudwatchHandler(*h) for i, record := range kinesisEvent.Records { cwData, err := decompressCloudwatchLogs(record.Kinesis.Data) if err != nil { diff --git a/aws/logs_monitoring_go/internal/handling/s3.go b/aws/logs_monitoring_go/internal/handling/s3.go index 38b6ad9a8..4a3c35e0e 100644 --- a/aws/logs_monitoring_go/internal/handling/s3.go +++ b/aws/logs_monitoring_go/internal/handling/s3.go @@ -38,7 +38,7 @@ func NewS3(cfg *config.Config) *S3Handler { } } -func (h S3Handler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error { +func (h *S3Handler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error { var s3Event events.S3Event if err := json.Unmarshal(event, &s3Event); err != nil { return fmt.Errorf("unmarshal: %w", err) diff --git a/aws/logs_monitoring_go/internal/parsing/content.go b/aws/logs_monitoring_go/internal/parsing/content.go new file mode 100644 index 000000000..e0b96c0d6 --- /dev/null +++ b/aws/logs_monitoring_go/internal/parsing/content.go @@ -0,0 +1,24 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package parsing + +import "encoding/json" + +//go:generate stringer -type ContentType -trimprefix ContentType -output content_string.go +type ContentType int + +const ( + ContentTypeUnknown ContentType = iota + ContentTypeCloudwatchLogs + ContentTypeS3 + ContentTypeKinesis + ContentTypeEventBridge +) + +type ParsedEvent struct { + ContentType ContentType + Payload json.RawMessage +} diff --git a/aws/logs_monitoring_go/internal/parsing/content_string.go b/aws/logs_monitoring_go/internal/parsing/content_string.go new file mode 100644 index 000000000..3a0a6d398 --- /dev/null +++ b/aws/logs_monitoring_go/internal/parsing/content_string.go @@ -0,0 +1,28 @@ +// Code generated by "stringer -type ContentType -trimprefix ContentType -output content_string.go"; DO NOT EDIT. + +package parsing + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[ContentTypeUnknown-0] + _ = x[ContentTypeCloudwatchLogs-1] + _ = x[ContentTypeS3-2] + _ = x[ContentTypeKinesis-3] + _ = x[ContentTypeEventBridge-4] +} + +const _ContentType_name = "UnknownCloudwatchLogsS3KinesisEventBridge" + +var _ContentType_index = [...]uint8{0, 7, 21, 23, 30, 41} + +func (i ContentType) String() string { + idx := int(i) - 0 + if i < 0 || idx >= len(_ContentType_index)-1 { + return "ContentType(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _ContentType_name[_ContentType_index[idx]:_ContentType_index[idx+1]] +} diff --git a/aws/logs_monitoring_go/internal/parsing/eventbridge.go b/aws/logs_monitoring_go/internal/parsing/eventbridge.go new file mode 100644 index 000000000..13c0ba726 --- /dev/null +++ b/aws/logs_monitoring_go/internal/parsing/eventbridge.go @@ -0,0 +1,136 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package parsing + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "strings" + + "github.com/aws/aws-lambda-go/events" +) + +const ( + eventBridgeSourceS3 = "aws.s3" + eventBridgeDetailTypeS3 = "Object Created" +) + +func parseEventBridge(event json.RawMessage) ([]ParsedEvent, error) { + source, detailType, detail, err := decodeEventBridgeEnvelope(event) + if err != nil { + return nil, fmt.Errorf("decode eventbridge: %w", err) + } + + if source == eventBridgeSourceS3 && strings.Contains(detailType, eventBridgeDetailTypeS3) { + s3Event, err := buildS3EventFromEventBridge(detail) + if err != nil { + return nil, fmt.Errorf("build s3 event from eventbridge: %w", err) + } + return []ParsedEvent{{ContentTypeS3, s3Event}}, nil + } + + return []ParsedEvent{{ContentTypeEventBridge, event}}, nil +} + +func decodeEventBridgeEnvelope(event json.RawMessage) (source, detailType string, detail json.RawMessage, err error) { + dec := json.NewDecoder(bytes.NewReader(event)) + + if t, err := dec.Token(); err != nil || t != json.Delim('{') { + return "", "", nil, errors.New("eventbridge envelope: expected '{'") + } + + for dec.More() { + key, err := dec.Token() + if err != nil { + return "", "", nil, fmt.Errorf("read key: %w", err) + } + switch key { + case "source": + if err := dec.Decode(&source); err != nil { + return "", "", nil, fmt.Errorf("decode source: %w", err) + } + case "detail-type": + if err := dec.Decode(&detailType); err != nil { + return "", "", nil, fmt.Errorf("decode detail-type: %w", err) + } + case "detail": + if err := dec.Decode(&detail); err != nil { + return "", "", nil, fmt.Errorf("decode detail: %w", err) + } + default: + var skip json.RawMessage + if err := dec.Decode(&skip); err != nil { + return "", "", nil, fmt.Errorf("skip field: %w", err) + } + } + } + + return source, detailType, detail, nil +} + +func buildS3EventFromEventBridge(detail json.RawMessage) (json.RawMessage, error) { + bucketName, objectKey, err := decodeEventBridgeS3Detail(detail) + if err != nil { + return nil, fmt.Errorf("decode eventbridge s3 detail: %w", err) + } + + s3Event := events.S3Event{ + Records: []events.S3EventRecord{{ + EventSource: eventSourceS3, + S3: events.S3Entity{ + Bucket: events.S3Bucket{Name: bucketName}, + Object: events.S3Object{Key: objectKey, URLDecodedKey: objectKey}, + }, + }}, + } + payload, err := json.Marshal(s3Event) + if err != nil { + return nil, fmt.Errorf("marshal synthetic s3 event: %w", err) + } + return payload, nil +} + +func decodeEventBridgeS3Detail(detail json.RawMessage) (bucket, key string, err error) { + dec := json.NewDecoder(bytes.NewReader(detail)) + + if t, err := dec.Token(); err != nil || t != json.Delim('{') { + return "", "", errors.New("eventbridge s3 detail: expected '{'") + } + + for dec.More() { + k, err := dec.Token() + if err != nil { + return "", "", fmt.Errorf("read key: %w", err) + } + switch k { + case "bucket": + var b struct { + Name string `json:"name"` + } + if err := dec.Decode(&b); err != nil { + return "", "", fmt.Errorf("decode bucket: %w", err) + } + bucket = b.Name + case "object": + var o struct { + Key string `json:"key"` + } + if err := dec.Decode(&o); err != nil { + return "", "", fmt.Errorf("decode object: %w", err) + } + key = o.Key + default: + var skip json.RawMessage + if err := dec.Decode(&skip); err != nil { + return "", "", fmt.Errorf("skip field: %w", err) + } + } + } + + return bucket, key, nil +} diff --git a/aws/logs_monitoring_go/internal/parsing/invocation_source.go b/aws/logs_monitoring_go/internal/parsing/invocation_source.go deleted file mode 100644 index d37fa41da..000000000 --- a/aws/logs_monitoring_go/internal/parsing/invocation_source.go +++ /dev/null @@ -1,89 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2026-Present Datadog, Inc. - -package parsing - -import ( - "bytes" - "encoding/json" -) - -//go:generate stringer -type InvocationSource -trimprefix InvocationSource -output invocation_source_string.go -type InvocationSource int - -const ( - InvocationSourceUnknown InvocationSource = iota - InvocationSourceCloudwatchLogs - InvocationSourceS3 - InvocationSourceSNS - InvocationSourceSQS - InvocationSourceKinesis -) - -func DetectInvocationSource(event json.RawMessage) InvocationSource { - dec := json.NewDecoder(bytes.NewReader(event)) - - t, err := dec.Token() - if err != nil || t != json.Delim('{') { - return InvocationSourceUnknown - } - - key, err := dec.Token() - if err != nil { - return InvocationSourceUnknown - } - if key == "awslogs" { - return InvocationSourceCloudwatchLogs - } - - if key == "Records" { - return detectFromRecords(dec) - } - - return InvocationSourceUnknown -} - -func detectFromRecords(dec *json.Decoder) InvocationSource { - t, err := dec.Token() - if err != nil || t != json.Delim('[') { - return InvocationSourceUnknown - } - - t, err = dec.Token() - if err != nil || t != json.Delim('{') { - return InvocationSourceUnknown - } - - for dec.More() { - key, err := dec.Token() - if err != nil { - return InvocationSourceUnknown - } - if key == "eventSource" { - val, err := dec.Token() - if err != nil { - return InvocationSourceUnknown - } - switch val { - case "aws:s3": - return InvocationSourceS3 - case "aws:sns": - return InvocationSourceSNS - case "aws:sqs": - return InvocationSourceSQS - case "aws:kinesis": - return InvocationSourceKinesis - default: - return InvocationSourceUnknown - } - } - var skip json.RawMessage - if err := dec.Decode(&skip); err != nil { - return InvocationSourceUnknown - } - } - - return InvocationSourceUnknown -} diff --git a/aws/logs_monitoring_go/internal/parsing/invocation_source_bench_test.go b/aws/logs_monitoring_go/internal/parsing/invocation_source_bench_test.go deleted file mode 100644 index 5f0ab09d5..000000000 --- a/aws/logs_monitoring_go/internal/parsing/invocation_source_bench_test.go +++ /dev/null @@ -1,67 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2026-Present Datadog, Inc. - -package parsing - -import ( - "bytes" - "encoding/json" - "fmt" - "strings" - "testing" -) - -const benchS3Records = 500 - -var ( - smallCWEvent = json.RawMessage(`{"awslogs":{"data":"dGVzdA=="}}`) - largeS3EventFirst = makeS3EventSourceFirst(benchS3Records) - largeS3EventLast = makeS3EventSourceLast(benchS3Records) -) - -var benchCases = []struct { - name string - event json.RawMessage -}{ - {"SmallCW", smallCWEvent}, - {"LargeS3_SourceFirst", largeS3EventFirst}, - {"LargeS3_SourceLast", largeS3EventLast}, -} - -func BenchmarkDetectInvocationSource(b *testing.B) { - for _, tc := range benchCases { - b.Run(tc.name, func(b *testing.B) { - for b.Loop() { - DetectInvocationSource(tc.event) - } - }) - } -} - -func makeS3EventSourceFirst(nRecords int) json.RawMessage { - var buf bytes.Buffer - buf.WriteString(`{"Records":[`) - for i := range nRecords { - if i > 0 { - buf.WriteByte(',') - } - fmt.Fprintf(&buf, `{"eventSource":"aws:s3","s3":{"bucket":{"name":"bucket-%d"},"object":{"key":"%s"}}}`, i, strings.Repeat("x", 200)) - } - buf.WriteString(`]}`) - return json.RawMessage(buf.Bytes()) -} - -func makeS3EventSourceLast(nRecords int) json.RawMessage { - var buf bytes.Buffer - buf.WriteString(`{"Records":[`) - for i := range nRecords { - if i > 0 { - buf.WriteByte(',') - } - fmt.Fprintf(&buf, `{"s3":{"bucket":{"name":"bucket-%d"},"object":{"key":"%s"}},"eventSource":"aws:s3"}`, i, strings.Repeat("x", 200)) - } - buf.WriteString(`]}`) - return json.RawMessage(buf.Bytes()) -} diff --git a/aws/logs_monitoring_go/internal/parsing/invocation_source_string.go b/aws/logs_monitoring_go/internal/parsing/invocation_source_string.go deleted file mode 100644 index 4e659dadd..000000000 --- a/aws/logs_monitoring_go/internal/parsing/invocation_source_string.go +++ /dev/null @@ -1,29 +0,0 @@ -// Code generated by "stringer -type InvocationSource -trimprefix InvocationSource -output invocation_source_string.go"; DO NOT EDIT. - -package parsing - -import "strconv" - -func _() { - // An "invalid array index" compiler error signifies that the constant values have changed. - // Re-run the stringer command to generate them again. - var x [1]struct{} - _ = x[InvocationSourceUnknown-0] - _ = x[InvocationSourceCloudwatchLogs-1] - _ = x[InvocationSourceS3-2] - _ = x[InvocationSourceSNS-3] - _ = x[InvocationSourceSQS-4] - _ = x[InvocationSourceKinesis-5] -} - -const _InvocationSource_name = "UnknownCloudwatchLogsS3SNSSQSKinesis" - -var _InvocationSource_index = [...]uint8{0, 7, 21, 23, 26, 29, 36} - -func (i InvocationSource) String() string { - idx := int(i) - 0 - if i < 0 || idx >= len(_InvocationSource_index)-1 { - return "InvocationSource(" + strconv.FormatInt(int64(i), 10) + ")" - } - return _InvocationSource_name[_InvocationSource_index[idx]:_InvocationSource_index[idx+1]] -} diff --git a/aws/logs_monitoring_go/internal/parsing/invocation_source_test.go b/aws/logs_monitoring_go/internal/parsing/invocation_source_test.go deleted file mode 100644 index 2a76478b9..000000000 --- a/aws/logs_monitoring_go/internal/parsing/invocation_source_test.go +++ /dev/null @@ -1,52 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2026-Present Datadog, Inc. - -package parsing - -import ( - "encoding/json" - "testing" -) - -func TestDetectInvocationSource(t *testing.T) { - t.Parallel() - - tests := map[string]struct { - event json.RawMessage - wantKey InvocationSource - }{ - "cloudwatch": { - event: json.RawMessage(`{"awslogs":{"data":"dGVzdA=="}}`), - wantKey: InvocationSourceCloudwatchLogs, - }, - "s3": { - event: json.RawMessage(`{"Records":[{"s3":{"bucket":{"name":"my-bucket"},"object":{"key":"my-key"}},"eventSource":"aws:s3"}]}`), - wantKey: InvocationSourceS3, - }, - "empty object": { - event: json.RawMessage(`{}`), - wantKey: InvocationSourceUnknown, - }, - "not JSON": { - event: json.RawMessage(`not a json`), - wantKey: InvocationSourceUnknown, - }, - "empty input": { - event: json.RawMessage(``), - wantKey: InvocationSourceUnknown, - }, - } - - for name, tc := range tests { - t.Run(name, func(t *testing.T) { - t.Parallel() - - got := DetectInvocationSource(tc.event) - if got != tc.wantKey { - t.Errorf("got %d, want %d", got, tc.wantKey) - } - }) - } -} diff --git a/aws/logs_monitoring_go/internal/parsing/parse.go b/aws/logs_monitoring_go/internal/parsing/parse.go new file mode 100644 index 000000000..aa000460a --- /dev/null +++ b/aws/logs_monitoring_go/internal/parsing/parse.go @@ -0,0 +1,92 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package parsing + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" +) + +const ( + eventSourceS3 = "aws:s3" + eventSourceKinesis = "aws:kinesis" +) + +func Parse(event json.RawMessage) ([]ParsedEvent, error) { + dec := json.NewDecoder(bytes.NewReader(event)) + + if t, err := dec.Token(); err != nil || t != json.Delim('{') { + return nil, errors.New("expected JSON object") + } + + for dec.More() { + key, err := dec.Token() + if err != nil { + return nil, fmt.Errorf("read key: %w", err) + } + + switch key { + case "awslogs": + return []ParsedEvent{{ContentTypeCloudwatchLogs, event}}, nil + case "Records": + return parseRecords(event, dec) + case "detail": + return parseEventBridge(event) + default: + var skip json.RawMessage + if err := dec.Decode(&skip); err != nil { + return nil, fmt.Errorf("skip value: %w", err) + } + } + } + + return nil, errors.New("unsupported event") +} + +func parseRecords(event json.RawMessage, dec *json.Decoder) ([]ParsedEvent, error) { + if t, err := dec.Token(); err != nil || t != json.Delim('[') { + return nil, errors.New("parse records: expected array") + } + if t, err := dec.Token(); err != nil || t != json.Delim('{') { + return nil, errors.New("parse records: expected object") + } + + for dec.More() { + key, err := dec.Token() + if err != nil { + return nil, fmt.Errorf("read record key: %w", err) + } + if key != "eventSource" && key != "EventSource" { + var skip json.RawMessage + if err := dec.Decode(&skip); err != nil { + return nil, fmt.Errorf("skip record field: %w", err) + } + continue + } + + val, err := dec.Token() + if err != nil { + return nil, fmt.Errorf("read event source value: %w", err) + } + + source, ok := val.(string) + if !ok { + return nil, fmt.Errorf("eventSource is not a string: %v", val) + } + switch source { + case eventSourceS3: + return []ParsedEvent{{ContentTypeS3, event}}, nil + case eventSourceKinesis: + return []ParsedEvent{{ContentTypeKinesis, event}}, nil + default: + return nil, fmt.Errorf("parse records: unsupported event source %q", source) + } + } + + return nil, errors.New("parse records: no eventSource found") +} diff --git a/aws/logs_monitoring_go/internal/parsing/parse_test.go b/aws/logs_monitoring_go/internal/parsing/parse_test.go new file mode 100644 index 000000000..545aeafb5 --- /dev/null +++ b/aws/logs_monitoring_go/internal/parsing/parse_test.go @@ -0,0 +1,94 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package parsing + +import ( + "encoding/json" + "testing" +) + +func TestParse(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + event json.RawMessage + want []ContentType + wantErr bool + }{ + "cloudwatch logs": { + event: json.RawMessage(`{"awslogs":{"data":"dGVzdA=="}}`), + want: []ContentType{ContentTypeCloudwatchLogs}, + }, + "s3": { + event: json.RawMessage(`{"Records":[{"eventSource":"aws:s3","s3":{"bucket":{"name":"b"},"object":{"key":"k"}}}]}`), + want: []ContentType{ContentTypeS3}, + }, + "kinesis": { + event: json.RawMessage(`{"Records":[{"eventSource":"aws:kinesis","kinesis":{"data":"dGVzdA=="}}]}`), + want: []ContentType{ContentTypeKinesis}, + }, + "eventbridge generic": { + event: json.RawMessage(`{"version":"0","id":"abc","detail-type":"Scheduled Event","source":"aws.events","detail":{}}`), + want: []ContentType{ContentTypeEventBridge}, + }, + "eventbridge s3": { + event: json.RawMessage(`{"version":"0","detail-type":"Object Created","source":"aws.s3","detail":{"bucket":{"name":"my-bucket"},"object":{"key":"my-key"}}}`), + want: []ContentType{ContentTypeS3}, + }, + "eventbridge ec2": { + event: json.RawMessage(`{"version":"0","detail-type":"EC2 Instance State-change Notification","source":"aws.ec2","detail":{"instance-id":"i-123"}}`), + want: []ContentType{ContentTypeEventBridge}, + }, + "eventbridge s3 without object created": { + event: json.RawMessage(`{"version":"0","detail-type":"Object Deleted","source":"aws.s3","detail":{}}`), + want: []ContentType{ContentTypeEventBridge}, + }, + "empty object": { + event: json.RawMessage(`{}`), + wantErr: true, + }, + "unsupported source": { + event: json.RawMessage(`{"Records":[{"eventSource":"aws:dynamodb"}]}`), + wantErr: true, + }, + "not JSON": { + event: json.RawMessage(`not json`), + wantErr: true, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + t.Parallel() + + got, err := Parse(tc.event) + + if tc.wantErr { + if err == nil { + t.Fatal("expected error, got nil") + } + return + } + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(got) != len(tc.want) { + t.Fatalf("got %d events, want %d", len(got), len(tc.want)) + } + + for i, pe := range got { + if pe.ContentType != tc.want[i] { + t.Errorf("event[%d]: got ContentType %v, want %v", i, pe.ContentType, tc.want[i]) + } + if len(pe.Payload) == 0 { + t.Errorf("event[%d]: empty payload", i) + } + } + }) + } +} diff --git a/aws/logs_monitoring_go/internal/pipeline/pipeline.go b/aws/logs_monitoring_go/internal/pipeline/pipeline.go index 28ddf1598..f762ff6c2 100644 --- a/aws/logs_monitoring_go/internal/pipeline/pipeline.go +++ b/aws/logs_monitoring_go/internal/pipeline/pipeline.go @@ -7,31 +7,47 @@ package pipeline import ( "context" - "encoding/json" + "errors" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/forwarding" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/handling" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing" "golang.org/x/sync/errgroup" ) func Start( ctx context.Context, - event json.RawMessage, - run *Run, + parsedEvents []parsing.ParsedEvent, + cfg *config.Config, ) error { - g, ctx := errgroup.WithContext(ctx) + if len(parsedEvents) == 0 { + return errors.New("no events to process") + } + + eg, ctx := errgroup.WithContext(ctx) entries := make(chan model.LogEntry) - forwarder := forwarding.NewForwarder(run.Cfg, forwarding.Client, run.Storage) + forwarder := forwarding.NewForwarder(cfg, forwarding.Client, forwarding.StorageFromContentType(parsedEvents[0].ContentType)) - g.Go(func() error { + eg.Go(func() error { defer close(entries) - return run.Handler.Handle(ctx, event, entries) + for _, parsedEvent := range parsedEvents { + handler, err := handling.NewHandler(parsedEvent.ContentType, cfg) + if err != nil { + return err + } + if err := handler.Handle(ctx, parsedEvent.Payload, entries); err != nil { + return err + } + } + return nil }) - g.Go(func() error { + eg.Go(func() error { return forwarder.Start(ctx, entries) }) - return g.Wait() + return eg.Wait() } diff --git a/aws/logs_monitoring_go/internal/pipeline/run.go b/aws/logs_monitoring_go/internal/pipeline/run.go deleted file mode 100644 index 5aee557a1..000000000 --- a/aws/logs_monitoring_go/internal/pipeline/run.go +++ /dev/null @@ -1,25 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2026-Present Datadog, Inc. - -package pipeline - -import ( - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/handling" -) - -type Run struct { - Cfg *config.Config - Handler handling.Handler - Storage string -} - -func NewRun(cfg *config.Config, handler handling.Handler, storage string) *Run { - return &Run{ - Cfg: cfg, - Handler: handler, - Storage: storage, - } -}