Skip to content

Commit 3270f73

Browse files
committed
[AWSINTS-3594] feat(go-forwarder): add CloudTrail
1 parent b14650c commit 3270f73

7 files changed

Lines changed: 558 additions & 44 deletions

File tree

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2026-Present Datadog, Inc.
5+
6+
package handling
7+
8+
import (
9+
"compress/gzip"
10+
"encoding/json"
11+
"errors"
12+
"fmt"
13+
"io"
14+
"iter"
15+
"log/slog"
16+
"regexp"
17+
"sync"
18+
)
19+
20+
const (
21+
cloudTrailARNKey = "arn"
22+
cloudTrailUserIdentityKey = "userIdentity"
23+
)
24+
25+
var cloudTrailRegex = sync.OnceValue(func() *regexp.Regexp {
26+
return regexp.MustCompile(`\d+_CloudTrail(|-Digest|-Insight)_\w{2}(|-gov|-cn)-\w{4,9}-\d_(|.+)\d{8}T\d{4,6}Z(|.+)\.json\.gz$`)
27+
})
28+
29+
var ec2InstanceRegexp = sync.OnceValue(func() *regexp.Regexp {
30+
return regexp.MustCompile(`^arn:aws:sts::.*?:assumed-role/(?P<role>.*?)/(?P<host>i-([0-9a-f]{8}|[0-9a-f]{17}))$`)
31+
})
32+
33+
func decodeCloudTrail(r io.Reader) iter.Seq[s3Record] {
34+
return func(yield func(s3Record) bool) {
35+
gz, err := gzip.NewReader(r)
36+
if err != nil {
37+
yield(s3Record{Err: fmt.Errorf("decode cloudtrail gzip: %w", err)})
38+
return
39+
}
40+
defer gz.Close() //nolint:errcheck
41+
42+
dec := json.NewDecoder(gz)
43+
44+
if t, err := dec.Token(); err != nil || t != json.Delim('{') {
45+
yield(s3Record{Err: errors.New("decode cloudtrail: expected '{' at start of JSON")})
46+
return
47+
}
48+
if t, err := dec.Token(); err != nil || t != "Records" {
49+
yield(s3Record{Err: errors.New("decode cloudtrail: expected 'Records' key")})
50+
return
51+
}
52+
if t, err := dec.Token(); err != nil || t != json.Delim('[') {
53+
yield(s3Record{Err: errors.New("decode cloudtrail: expected '[' at start of Records array")})
54+
return
55+
}
56+
57+
for dec.More() {
58+
var record map[string]any
59+
if err := dec.Decode(&record); err != nil {
60+
yield(s3Record{Err: fmt.Errorf("decode cloudtrail record: %w", err)})
61+
return
62+
}
63+
64+
msg, err := json.Marshal(record)
65+
if err != nil {
66+
yield(s3Record{Err: fmt.Errorf("marshal cloudtrail record: %w", err)})
67+
return
68+
}
69+
70+
host := cloudtrailHost(record)
71+
if !yield(s3Record{Message: string(msg), Host: host}) {
72+
return
73+
}
74+
}
75+
}
76+
}
77+
78+
func cloudtrailHostFromMessage(message string) string {
79+
var record map[string]any
80+
if err := json.Unmarshal([]byte(message), &record); err != nil {
81+
return ""
82+
}
83+
return cloudtrailHost(record)
84+
}
85+
86+
func cloudtrailHost(record map[string]any) string {
87+
ui, ok := record[cloudTrailUserIdentityKey].(map[string]any)
88+
if !ok {
89+
slog.Debug(cloudTrailUserIdentityKey + " key not found, cloudtrail host extraction skipped")
90+
return ""
91+
}
92+
arn, ok := ui[cloudTrailARNKey].(string)
93+
if !ok {
94+
slog.Debug(cloudTrailARNKey + " key not found, cloudtrail host extraction skipped")
95+
return ""
96+
}
97+
98+
re := ec2InstanceRegexp()
99+
matches := re.FindStringSubmatch(arn)
100+
if matches == nil {
101+
slog.Debug(arn + " arn did not match, cloudtrail host extraction skipped")
102+
return ""
103+
}
104+
105+
return matches[re.SubexpIndex("host")]
106+
}
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2026-Present Datadog, Inc.
5+
6+
package handling
7+
8+
import (
9+
"bytes"
10+
"testing"
11+
12+
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/testutil"
13+
"github.com/google/go-cmp/cmp"
14+
)
15+
16+
func TestCloudTrailRegex(t *testing.T) {
17+
t.Parallel()
18+
19+
tests := map[string]struct {
20+
key string
21+
want bool
22+
}{
23+
"standard cloudtrail": {
24+
key: "601427279990_CloudTrail_us-east-1_20210503T0000Z_QrttGEk4ZcBTLwj5.json.gz",
25+
want: true,
26+
},
27+
"cloudtrail digest": {
28+
key: "601427279990_CloudTrail-Digest_us-east-1_20210503T0000Z_digest.json.gz",
29+
want: true,
30+
},
31+
"cloudtrail insight": {
32+
key: "601427279990_CloudTrail-Insight_us-east-1_20210503T0000Z_insight.json.gz",
33+
want: true,
34+
},
35+
"gov region": {
36+
key: "601427279990_CloudTrail_us-gov-west-1_20210503T0000Z_abc.json.gz",
37+
want: true,
38+
},
39+
"cn region": {
40+
key: "601427279990_CloudTrail_cn-north-1_20210503T0000Z_abc.json.gz",
41+
want: true,
42+
},
43+
"not cloudtrail": {
44+
key: "some-random-log-file.json.gz",
45+
want: false,
46+
},
47+
"waf log": {
48+
key: "aws-waf-logs-something.json.gz",
49+
want: false,
50+
},
51+
"plain text file": {
52+
key: "access.log",
53+
want: false,
54+
},
55+
}
56+
57+
for name, tc := range tests {
58+
t.Run(name, func(t *testing.T) {
59+
t.Parallel()
60+
got := cloudTrailRegex().MatchString(tc.key)
61+
if got != tc.want {
62+
t.Errorf("cloudTrailRegex().MatchString(%q) = %v, want %v", tc.key, got, tc.want)
63+
}
64+
})
65+
}
66+
}
67+
68+
func TestCloudtrailHost(t *testing.T) {
69+
t.Parallel()
70+
71+
tests := map[string]struct {
72+
record map[string]any
73+
want string
74+
}{
75+
"ec2 instance (17)": {
76+
record: map[string]any{
77+
"userIdentity": map[string]any{
78+
"arn": "arn:aws:sts::601427279990:assumed-role/MyRole/i-08014e4f62ccf762d",
79+
},
80+
},
81+
want: "i-08014e4f62ccf762d",
82+
},
83+
"ec2 instance (8)": {
84+
record: map[string]any{
85+
"userIdentity": map[string]any{
86+
"arn": "arn:aws:sts::601427279990:assumed-role/MyRole/i-abcd1234",
87+
},
88+
},
89+
want: "i-abcd1234",
90+
},
91+
"non ec2 arn": {
92+
record: map[string]any{
93+
"userIdentity": map[string]any{
94+
"arn": "arn:aws:sts::601427279990:assumed-role/MyRole/my-session",
95+
},
96+
},
97+
want: "",
98+
},
99+
"missing userIdentity": {
100+
record: map[string]any{"eventName": "DescribeTable"},
101+
want: "",
102+
},
103+
"missing arn": {
104+
record: map[string]any{
105+
"userIdentity": map[string]any{
106+
"type": "AssumedRole",
107+
},
108+
},
109+
want: "",
110+
},
111+
}
112+
113+
for name, tc := range tests {
114+
t.Run(name, func(t *testing.T) {
115+
t.Parallel()
116+
host := cloudtrailHost(tc.record)
117+
if host != tc.want {
118+
t.Errorf("want %q, got %q", tc.want, host)
119+
}
120+
})
121+
}
122+
}
123+
124+
func TestDecodeCloudTrail(t *testing.T) {
125+
t.Parallel()
126+
127+
tests := map[string]struct {
128+
input []byte
129+
want []s3Record
130+
wantErr bool
131+
}{
132+
"single record with ec2 host": {
133+
input: testutil.MustGzipJSON(t, map[string]any{
134+
"Records": []any{
135+
map[string]any{
136+
"eventName": "DescribeTable",
137+
"userIdentity": map[string]any{
138+
"arn": "arn:aws:sts::601427279990:assumed-role/MyRole/i-08014e4f62ccf762d",
139+
},
140+
},
141+
},
142+
}),
143+
want: []s3Record{
144+
{
145+
Message: `{"eventName":"DescribeTable","userIdentity":{"arn":"arn:aws:sts::601427279990:assumed-role/MyRole/i-08014e4f62ccf762d"}}`,
146+
Host: "i-08014e4f62ccf762d",
147+
},
148+
},
149+
},
150+
"single record without ec2 host": {
151+
input: testutil.MustGzipJSON(t, map[string]any{
152+
"Records": []any{
153+
map[string]any{
154+
"eventName": "DescribeTable",
155+
"userIdentity": map[string]any{
156+
"arn": "arn:aws:iam::601427279990:user/admin",
157+
},
158+
},
159+
},
160+
}),
161+
want: []s3Record{
162+
{
163+
Message: `{"eventName":"DescribeTable","userIdentity":{"arn":"arn:aws:iam::601427279990:user/admin"}}`,
164+
Host: "",
165+
},
166+
},
167+
},
168+
"multiple records": {
169+
input: testutil.MustGzipJSON(t, map[string]any{
170+
"Records": []any{
171+
map[string]any{"eventName": "event1"},
172+
map[string]any{"eventName": "event2"},
173+
},
174+
}),
175+
want: []s3Record{
176+
{Message: `{"eventName":"event1"}`},
177+
{Message: `{"eventName":"event2"}`},
178+
},
179+
},
180+
"empty records array": {
181+
input: testutil.MustGzipJSON(t, map[string]any{
182+
"Records": []any{},
183+
}),
184+
want: nil,
185+
},
186+
"invalid gzip": {
187+
input: []byte("not gzip"),
188+
wantErr: true,
189+
},
190+
"invalid json": {
191+
input: testutil.MustGzipJSON(t, "not an object"),
192+
wantErr: true,
193+
},
194+
}
195+
196+
for name, tc := range tests {
197+
t.Run(name, func(t *testing.T) {
198+
t.Parallel()
199+
200+
var got []s3Record
201+
for rec := range decodeCloudTrail(bytes.NewReader(tc.input)) {
202+
if rec.Err != nil {
203+
if !tc.wantErr {
204+
t.Fatalf("unexpected error: %v", rec.Err)
205+
}
206+
return
207+
}
208+
got = append(got, rec)
209+
}
210+
211+
if tc.wantErr {
212+
t.Fatal("expected error, got none")
213+
}
214+
if diff := cmp.Diff(tc.want, got); diff != "" {
215+
t.Errorf("mismatch (-want +got):\n%s", diff)
216+
}
217+
})
218+
}
219+
}

aws/logs_monitoring_go/internal/handling/cloudwatch.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,13 @@ func (h CloudwatchHandler) newCloudwatchLogEntry(event events.CloudwatchLogsLogE
134134
entry.Message = message
135135
entry.ID = event.ID
136136
entry.Timestamp = event.Timestamp
137+
138+
if entry.Source == sourceCloudtrail {
139+
if host := cloudtrailHostFromMessage(event.Message); host != "" {
140+
entry.Host = host
141+
}
142+
}
143+
137144
return entry
138145
}
139146

0 commit comments

Comments
 (0)