Skip to content

Commit c518f8b

Browse files
ndakkouneViBiOh
andauthored
[AWSINTS-3594] feat(go-forwarder): add CloudTrail (#1116)
Co-authored-by: Vincent Boutour <vincent.boutour@datadoghq.com>
1 parent d47d51d commit c518f8b

7 files changed

Lines changed: 540 additions & 52 deletions

File tree

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2026-Present Datadog, Inc.
5+
6+
package handling
7+
8+
import (
9+
"compress/gzip"
10+
"encoding/json"
11+
"errors"
12+
"fmt"
13+
"io"
14+
"iter"
15+
"log/slog"
16+
"regexp"
17+
"strings"
18+
)
19+
20+
const (
21+
cloudTrailARNKey = "arn"
22+
cloudTrailUserIdentityKey = "userIdentity"
23+
)
24+
25+
var (
26+
cloudTrailRegex = regexp.MustCompile(`\d+_CloudTrail(|-Digest|-Insight)_\w{2}(|-gov|-cn)-\w{4,9}-\d_(|.+)\d{8}T\d{4,6}Z(|.+)\.json\.gz$`)
27+
ec2InstanceRegexp = regexp.MustCompile(`^arn:aws:sts::.*?:assumed-role/(?P<role>.*?)/(?P<host>i-([0-9a-f]{8}|[0-9a-f]{17}))$`)
28+
)
29+
30+
func decodeCloudTrail(r io.Reader) iter.Seq2[string, error] {
31+
return func(yield func(string, error) bool) {
32+
gz, err := gzip.NewReader(r)
33+
if err != nil {
34+
yield("", fmt.Errorf("decode cloudtrail gzip: %w", err))
35+
return
36+
}
37+
defer gz.Close() //nolint:errcheck
38+
39+
dec := json.NewDecoder(gz)
40+
41+
if t, err := dec.Token(); err != nil || t != json.Delim('{') {
42+
yield("", errors.New("decode cloudtrail: expected '{' at start of JSON"))
43+
return
44+
}
45+
if t, err := dec.Token(); err != nil || t != "Records" {
46+
yield("", errors.New("decode cloudtrail: expected 'Records' key"))
47+
return
48+
}
49+
if t, err := dec.Token(); err != nil || t != json.Delim('[') {
50+
yield("", errors.New("decode cloudtrail: expected '[' at start of Records array"))
51+
return
52+
}
53+
54+
for dec.More() {
55+
var raw json.RawMessage
56+
if err := dec.Decode(&raw); err != nil {
57+
yield("", fmt.Errorf("decode cloudtrail record: %w", err))
58+
return
59+
}
60+
if !yield(string(raw), nil) {
61+
return
62+
}
63+
}
64+
}
65+
}
66+
67+
func cloudtrailHost(message string) string {
68+
dec := json.NewDecoder(strings.NewReader(message))
69+
if t, err := dec.Token(); err != nil || t != json.Delim('{') {
70+
return ""
71+
}
72+
73+
for dec.More() {
74+
key, err := dec.Token()
75+
if err != nil {
76+
return ""
77+
}
78+
if key != cloudTrailUserIdentityKey {
79+
var skip json.RawMessage
80+
if err := dec.Decode(&skip); err != nil {
81+
return ""
82+
}
83+
continue
84+
}
85+
86+
if t, err := dec.Token(); err != nil || t != json.Delim('{') {
87+
return ""
88+
}
89+
90+
for dec.More() {
91+
innerKey, err := dec.Token()
92+
if err != nil {
93+
return ""
94+
}
95+
if innerKey != cloudTrailARNKey {
96+
var skip json.RawMessage
97+
if err := dec.Decode(&skip); err != nil {
98+
return ""
99+
}
100+
continue
101+
}
102+
103+
var arn string
104+
if err := dec.Decode(&arn); err != nil {
105+
return ""
106+
}
107+
108+
matches := ec2InstanceRegexp.FindStringSubmatch(arn)
109+
if matches == nil {
110+
slog.Debug(arn + " arn did not match an EC2 host instance, cloudtrail host extraction skipped")
111+
return ""
112+
}
113+
return matches[ec2InstanceRegexp.SubexpIndex("host")]
114+
}
115+
return ""
116+
}
117+
return ""
118+
}
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2026-Present Datadog, Inc.
5+
6+
package handling
7+
8+
import (
9+
"bytes"
10+
"testing"
11+
12+
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/testutil"
13+
"github.com/google/go-cmp/cmp"
14+
)
15+
16+
func TestCloudTrailRegex(t *testing.T) {
17+
t.Parallel()
18+
19+
tests := map[string]struct {
20+
key string
21+
want bool
22+
}{
23+
"standard cloudtrail": {
24+
key: "601427279990_CloudTrail_us-east-1_20210503T0000Z_QrttGEk4ZcBTLwj5.json.gz",
25+
want: true,
26+
},
27+
"cloudtrail digest": {
28+
key: "601427279990_CloudTrail-Digest_us-east-1_20210503T0000Z_digest.json.gz",
29+
want: true,
30+
},
31+
"cloudtrail insight": {
32+
key: "601427279990_CloudTrail-Insight_us-east-1_20210503T0000Z_insight.json.gz",
33+
want: true,
34+
},
35+
"gov region": {
36+
key: "601427279990_CloudTrail_us-gov-west-1_20210503T0000Z_abc.json.gz",
37+
want: true,
38+
},
39+
"cn region": {
40+
key: "601427279990_CloudTrail_cn-north-1_20210503T0000Z_abc.json.gz",
41+
want: true,
42+
},
43+
"not cloudtrail": {
44+
key: "some-random-log-file.json.gz",
45+
want: false,
46+
},
47+
"waf log": {
48+
key: "aws-waf-logs-something.json.gz",
49+
want: false,
50+
},
51+
"plain text file": {
52+
key: "access.log",
53+
want: false,
54+
},
55+
}
56+
57+
for name, tc := range tests {
58+
t.Run(name, func(t *testing.T) {
59+
t.Parallel()
60+
got := cloudTrailRegex.MatchString(tc.key)
61+
if got != tc.want {
62+
t.Errorf("cloudTrailRegex().MatchString(%q) = %v, want %v", tc.key, got, tc.want)
63+
}
64+
})
65+
}
66+
}
67+
68+
func TestCloudtrailHost(t *testing.T) {
69+
t.Parallel()
70+
71+
tests := map[string]struct {
72+
message string
73+
want string
74+
}{
75+
"ec2 instance (17)": {
76+
message: `{"userIdentity":{"arn":"arn:aws:sts::601427279990:assumed-role/MyRole/i-08014e4f62ccf762d"}}`,
77+
want: "i-08014e4f62ccf762d",
78+
},
79+
"ec2 instance (8)": {
80+
message: `{"userIdentity":{"arn":"arn:aws:sts::601427279990:assumed-role/MyRole/i-abcd1234"}}`,
81+
want: "i-abcd1234",
82+
},
83+
"non ec2 arn": {
84+
message: `{"userIdentity":{"arn":"arn:aws:sts::601427279990:assumed-role/MyRole/my-session"}}`,
85+
want: "",
86+
},
87+
"missing userIdentity": {
88+
message: `{"eventName":"DescribeTable"}`,
89+
want: "",
90+
},
91+
"missing arn": {
92+
message: `{"userIdentity":{"type":"AssumedRole"}}`,
93+
want: "",
94+
},
95+
"invalid json": {
96+
message: "not json",
97+
want: "",
98+
},
99+
"empty message": {
100+
message: "",
101+
want: "",
102+
},
103+
}
104+
105+
for name, tc := range tests {
106+
t.Run(name, func(t *testing.T) {
107+
t.Parallel()
108+
if got := cloudtrailHost(tc.message); got != tc.want {
109+
t.Errorf("want %q, got %q", tc.want, got)
110+
}
111+
})
112+
}
113+
}
114+
115+
func TestDecodeCloudTrail(t *testing.T) {
116+
t.Parallel()
117+
118+
tests := map[string]struct {
119+
input []byte
120+
want []string
121+
wantErr bool
122+
}{
123+
"single record": {
124+
input: testutil.MustGzipJSON(t, map[string]any{
125+
"Records": []any{
126+
map[string]any{
127+
"eventName": "DescribeTable",
128+
"userIdentity": map[string]any{
129+
"arn": "arn:aws:sts::601427279990:assumed-role/MyRole/i-08014e4f62ccf762d",
130+
},
131+
},
132+
},
133+
}),
134+
want: []string{
135+
`{"eventName":"DescribeTable","userIdentity":{"arn":"arn:aws:sts::601427279990:assumed-role/MyRole/i-08014e4f62ccf762d"}}`,
136+
},
137+
},
138+
"multiple records": {
139+
input: testutil.MustGzipJSON(t, map[string]any{
140+
"Records": []any{
141+
map[string]any{"eventName": "event1"},
142+
map[string]any{"eventName": "event2"},
143+
},
144+
}),
145+
want: []string{
146+
`{"eventName":"event1"}`,
147+
`{"eventName":"event2"}`,
148+
},
149+
},
150+
"empty records array": {
151+
input: testutil.MustGzipJSON(t, map[string]any{
152+
"Records": []any{},
153+
}),
154+
want: nil,
155+
},
156+
"invalid gzip": {
157+
input: []byte("not gzip"),
158+
wantErr: true,
159+
},
160+
"invalid json": {
161+
input: testutil.MustGzipJSON(t, "not an object"),
162+
wantErr: true,
163+
},
164+
}
165+
166+
for name, tc := range tests {
167+
t.Run(name, func(t *testing.T) {
168+
t.Parallel()
169+
170+
var got []string
171+
for msg, err := range decodeCloudTrail(bytes.NewReader(tc.input)) {
172+
if err != nil {
173+
if !tc.wantErr {
174+
t.Fatalf("unexpected error: %v", err)
175+
}
176+
return
177+
}
178+
got = append(got, msg)
179+
}
180+
181+
if tc.wantErr {
182+
t.Fatal("expected error, got none")
183+
}
184+
if diff := cmp.Diff(tc.want, got); diff != "" {
185+
t.Errorf("mismatch (-want +got):\n%s", diff)
186+
}
187+
})
188+
}
189+
}

aws/logs_monitoring_go/internal/handling/cloudwatch.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,11 @@ func (h CloudwatchHandler) newCloudwatchLogEntry(event events.CloudwatchLogsLogE
134134
entry.Message = message
135135
entry.ID = event.ID
136136
entry.Timestamp = event.Timestamp
137+
138+
if entry.Source == sourceCloudtrail {
139+
entry.Host = cloudtrailHost(event.Message)
140+
}
141+
137142
return entry
138143
}
139144

aws/logs_monitoring_go/internal/handling/cloudwatch_test.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,78 @@ func TestCloudwatchHandler_Handle(t *testing.T) {
118118
},
119119
},
120120
},
121+
"cloudtrail with ec2 host": {
122+
event: testutil.MustCloudwatchEvent(t, testutil.MustGzipJSON(t, map[string]any{
123+
"messageType": "DATA_MESSAGE",
124+
"owner": "601427279990",
125+
"logGroup": "cloudtrail-logs",
126+
"logStream": "601427279990_CloudTrail_us-east-1",
127+
"logEvents": []map[string]any{
128+
{
129+
"id": "ct1",
130+
"timestamp": 1620000000000,
131+
"message": `{"eventName":"DescribeTable","userIdentity":{"arn":"arn:aws:sts::601427279990:assumed-role/MyRole/i-08014e4f62ccf762d"}}`,
132+
},
133+
},
134+
})),
135+
config: testutil.EmptyConfig(),
136+
chanSize: 1,
137+
want: []model.LogEntry{
138+
{
139+
Message: `{"eventName":"DescribeTable","userIdentity":{"arn":"arn:aws:sts::601427279990:assumed-role/MyRole/i-08014e4f62ccf762d"}}`,
140+
Source: "cloudtrail",
141+
SourceCategory: "aws",
142+
Service: "cloudtrail",
143+
Host: "i-08014e4f62ccf762d",
144+
ID: "ct1",
145+
Timestamp: 1620000000000,
146+
Metadata: model.CloudwatchMetadata{
147+
LambdaOrigin: model.LambdaOrigin{ARN: testutil.ARN},
148+
Origin: model.CloudwatchOrigin{
149+
LogGroup: "cloudtrail-logs",
150+
LogStream: "601427279990_CloudTrail_us-east-1",
151+
Owner: "601427279990",
152+
},
153+
},
154+
},
155+
},
156+
},
157+
"cloudtrail without ec2 host": {
158+
event: testutil.MustCloudwatchEvent(t, testutil.MustGzipJSON(t, map[string]any{
159+
"messageType": "DATA_MESSAGE",
160+
"owner": "601427279990",
161+
"logGroup": "cloudtrail-logs",
162+
"logStream": "601427279990_CloudTrail_us-east-1",
163+
"logEvents": []map[string]any{
164+
{
165+
"id": "ct2",
166+
"timestamp": 1620000000000,
167+
"message": `{"eventName":"DescribeTable","userIdentity":{"arn":"arn:aws:iam::601427279990:user/admin"}}`,
168+
},
169+
},
170+
})),
171+
config: testutil.EmptyConfig(),
172+
chanSize: 1,
173+
want: []model.LogEntry{
174+
{
175+
Message: `{"eventName":"DescribeTable","userIdentity":{"arn":"arn:aws:iam::601427279990:user/admin"}}`,
176+
Source: "cloudtrail",
177+
SourceCategory: "aws",
178+
Service: "cloudtrail",
179+
Host: "",
180+
ID: "ct2",
181+
Timestamp: 1620000000000,
182+
Metadata: model.CloudwatchMetadata{
183+
LambdaOrigin: model.LambdaOrigin{ARN: testutil.ARN},
184+
Origin: model.CloudwatchOrigin{
185+
LogGroup: "cloudtrail-logs",
186+
LogStream: "601427279990_CloudTrail_us-east-1",
187+
Owner: "601427279990",
188+
},
189+
},
190+
},
191+
},
192+
},
121193
"config overrides source, host, service and tags": {
122194
event: testutil.MustCloudwatchEvent(t, testutil.MustGzipJSON(t, map[string]any{
123195
"messageType": "DATA_MESSAGE",

0 commit comments

Comments
 (0)