Skip to content
21 changes: 5 additions & 16 deletions aws/logs_monitoring_go/cmd/forwarder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@ package main
import (
"context"
"encoding/json"
"errors"
"fmt"

"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/forwarding"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/handling"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/pipeline"

Expand All @@ -33,24 +31,15 @@ func main() {
panic(err)
}

cwHandler := handling.NewCloudwatch(cfg)
kinesisHandler := handling.NewKinesis(cfg)
s3Handler := handling.NewS3(cfg)
handling.Register(parsing.InvocationSourceCloudwatchLogs, cwHandler)
handling.Register(parsing.InvocationSourceKinesis, kinesisHandler)
handling.Register(parsing.InvocationSourceS3, s3Handler)

lambda.Start(handleRequest(cfg))
}

func handleRequest(cfg *config.Config) func(ctx context.Context, event json.RawMessage) error {
return func(ctx context.Context, event json.RawMessage) error {
invocation := parsing.DetectInvocationSource(event)
if invocation == parsing.InvocationSourceUnknown {
return errors.New("unknown invocation")
parsed, err := parsing.Parse(event)
if err != nil {
return fmt.Errorf("parse: %w", err)
}

run := pipeline.NewRun(cfg, handling.Handlers[invocation], forwarding.Storage(invocation))
return pipeline.Start(ctx, event, run)
return pipeline.Start(ctx, parsed, cfg)
}
}
8 changes: 4 additions & 4 deletions aws/logs_monitoring_go/internal/forwarding/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ const (
s3Storage = "s3"
)

func Storage(source parsing.InvocationSource) string {
switch source {
case parsing.InvocationSourceS3:
func StorageFromContentType(contentType parsing.ContentType) string {
switch contentType {
case parsing.ContentTypeS3:
return s3Storage
case parsing.InvocationSourceCloudwatchLogs, parsing.InvocationSourceKinesis:
case parsing.ContentTypeCloudwatchLogs, parsing.ContentTypeKinesis:
return cloudwatchStorage
default:
return ""
Expand Down
118 changes: 118 additions & 0 deletions aws/logs_monitoring_go/internal/handling/cloudtrail.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2026-Present Datadog, Inc.

package handling

import (
"compress/gzip"
"encoding/json"
"errors"
"fmt"
"io"
"iter"
"log/slog"
"regexp"
"strings"
)

const (
cloudTrailARNKey = "arn"
cloudTrailUserIdentityKey = "userIdentity"
)

var (
cloudTrailRegex = regexp.MustCompile(`\d+_CloudTrail(|-Digest|-Insight)_\w{2}(|-gov|-cn)-\w{4,9}-\d_(|.+)\d{8}T\d{4,6}Z(|.+)\.json\.gz$`)
ec2InstanceRegexp = regexp.MustCompile(`^arn:aws:sts::.*?:assumed-role/(?P<role>.*?)/(?P<host>i-([0-9a-f]{8}|[0-9a-f]{17}))$`)
)

func decodeCloudTrail(r io.Reader) iter.Seq2[string, error] {
return func(yield func(string, error) bool) {
gz, err := gzip.NewReader(r)
if err != nil {
yield("", fmt.Errorf("decode cloudtrail gzip: %w", err))
return
}
defer gz.Close() //nolint:errcheck

dec := json.NewDecoder(gz)

if t, err := dec.Token(); err != nil || t != json.Delim('{') {
yield("", errors.New("decode cloudtrail: expected '{' at start of JSON"))
return
}
if t, err := dec.Token(); err != nil || t != "Records" {
yield("", errors.New("decode cloudtrail: expected 'Records' key"))
return
}
if t, err := dec.Token(); err != nil || t != json.Delim('[') {
yield("", errors.New("decode cloudtrail: expected '[' at start of Records array"))
return
}

for dec.More() {
var raw json.RawMessage
if err := dec.Decode(&raw); err != nil {
yield("", fmt.Errorf("decode cloudtrail record: %w", err))
return
}
if !yield(string(raw), nil) {
return
}
}
}
}

func cloudtrailHost(message string) string {
dec := json.NewDecoder(strings.NewReader(message))
if t, err := dec.Token(); err != nil || t != json.Delim('{') {
return ""
}

for dec.More() {
key, err := dec.Token()
if err != nil {
return ""
}
if key != cloudTrailUserIdentityKey {
var skip json.RawMessage
if err := dec.Decode(&skip); err != nil {
return ""
}
continue
}

if t, err := dec.Token(); err != nil || t != json.Delim('{') {
return ""
}

for dec.More() {
innerKey, err := dec.Token()
if err != nil {
return ""
}
if innerKey != cloudTrailARNKey {
var skip json.RawMessage
if err := dec.Decode(&skip); err != nil {
return ""
}
continue
}

var arn string
if err := dec.Decode(&arn); err != nil {
return ""
}

matches := ec2InstanceRegexp.FindStringSubmatch(arn)
if matches == nil {
slog.Debug(arn + " arn did not match an EC2 host instance, cloudtrail host extraction skipped")
return ""
}
return matches[ec2InstanceRegexp.SubexpIndex("host")]
}
return ""
}
return ""
}
189 changes: 189 additions & 0 deletions aws/logs_monitoring_go/internal/handling/cloudtrail_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2026-Present Datadog, Inc.

package handling

import (
"bytes"
"testing"

"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/testutil"
"github.com/google/go-cmp/cmp"
)

func TestCloudTrailRegex(t *testing.T) {
t.Parallel()

tests := map[string]struct {
key string
want bool
}{
"standard cloudtrail": {
key: "601427279990_CloudTrail_us-east-1_20210503T0000Z_QrttGEk4ZcBTLwj5.json.gz",
want: true,
},
"cloudtrail digest": {
key: "601427279990_CloudTrail-Digest_us-east-1_20210503T0000Z_digest.json.gz",
want: true,
},
"cloudtrail insight": {
key: "601427279990_CloudTrail-Insight_us-east-1_20210503T0000Z_insight.json.gz",
want: true,
},
"gov region": {
key: "601427279990_CloudTrail_us-gov-west-1_20210503T0000Z_abc.json.gz",
want: true,
},
"cn region": {
key: "601427279990_CloudTrail_cn-north-1_20210503T0000Z_abc.json.gz",
want: true,
},
"not cloudtrail": {
key: "some-random-log-file.json.gz",
want: false,
},
"waf log": {
key: "aws-waf-logs-something.json.gz",
want: false,
},
"plain text file": {
key: "access.log",
want: false,
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
t.Parallel()
got := cloudTrailRegex.MatchString(tc.key)
if got != tc.want {
t.Errorf("cloudTrailRegex().MatchString(%q) = %v, want %v", tc.key, got, tc.want)
}
})
}
}

func TestCloudtrailHost(t *testing.T) {
t.Parallel()

tests := map[string]struct {
message string
want string
}{
"ec2 instance (17)": {
message: `{"userIdentity":{"arn":"arn:aws:sts::601427279990:assumed-role/MyRole/i-08014e4f62ccf762d"}}`,
want: "i-08014e4f62ccf762d",
},
"ec2 instance (8)": {
message: `{"userIdentity":{"arn":"arn:aws:sts::601427279990:assumed-role/MyRole/i-abcd1234"}}`,
want: "i-abcd1234",
},
"non ec2 arn": {
message: `{"userIdentity":{"arn":"arn:aws:sts::601427279990:assumed-role/MyRole/my-session"}}`,
want: "",
},
"missing userIdentity": {
message: `{"eventName":"DescribeTable"}`,
want: "",
},
"missing arn": {
message: `{"userIdentity":{"type":"AssumedRole"}}`,
want: "",
},
"invalid json": {
message: "not json",
want: "",
},
"empty message": {
message: "",
want: "",
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
t.Parallel()
if got := cloudtrailHost(tc.message); got != tc.want {
t.Errorf("want %q, got %q", tc.want, got)
}
})
}
}

func TestDecodeCloudTrail(t *testing.T) {
t.Parallel()

tests := map[string]struct {
input []byte
want []string
wantErr bool
}{
"single record": {
input: testutil.MustGzipJSON(t, map[string]any{
"Records": []any{
map[string]any{
"eventName": "DescribeTable",
"userIdentity": map[string]any{
"arn": "arn:aws:sts::601427279990:assumed-role/MyRole/i-08014e4f62ccf762d",
},
},
},
}),
want: []string{
`{"eventName":"DescribeTable","userIdentity":{"arn":"arn:aws:sts::601427279990:assumed-role/MyRole/i-08014e4f62ccf762d"}}`,
},
},
"multiple records": {
input: testutil.MustGzipJSON(t, map[string]any{
"Records": []any{
map[string]any{"eventName": "event1"},
map[string]any{"eventName": "event2"},
},
}),
want: []string{
`{"eventName":"event1"}`,
`{"eventName":"event2"}`,
},
},
"empty records array": {
input: testutil.MustGzipJSON(t, map[string]any{
"Records": []any{},
}),
want: nil,
},
"invalid gzip": {
input: []byte("not gzip"),
wantErr: true,
},
"invalid json": {
input: testutil.MustGzipJSON(t, "not an object"),
wantErr: true,
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
t.Parallel()

var got []string
for msg, err := range decodeCloudTrail(bytes.NewReader(tc.input)) {
if err != nil {
if !tc.wantErr {
t.Fatalf("unexpected error: %v", err)
}
return
}
got = append(got, msg)
}

if tc.wantErr {
t.Fatal("expected error, got none")
}
if diff := cmp.Diff(tc.want, got); diff != "" {
t.Errorf("mismatch (-want +got):\n%s", diff)
}
})
}
}
7 changes: 6 additions & 1 deletion aws/logs_monitoring_go/internal/handling/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func NewCloudwatch(cfg *config.Config) *CloudwatchHandler {
}
}

func (h CloudwatchHandler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error {
func (h *CloudwatchHandler) Handle(ctx context.Context, event json.RawMessage, out chan<- model.LogEntry) error {
var cwEvent events.CloudwatchLogsEvent
if err := json.Unmarshal(event, &cwEvent); err != nil {
return fmt.Errorf("unmarshal: %w", err)
Expand Down Expand Up @@ -134,6 +134,11 @@ func (h CloudwatchHandler) newCloudwatchLogEntry(event events.CloudwatchLogsLogE
entry.Message = message
entry.ID = event.ID
entry.Timestamp = event.Timestamp

if entry.Source == sourceCloudtrail {
entry.Host = cloudtrailHost(event.Message)
}

return entry
}

Expand Down
Loading