Skip to content

Commit b341756

Browse files
committed
Delete s3Record type and simplify cloudtrail logic
1 parent 2e33366 commit b341756

5 files changed

Lines changed: 104 additions & 118 deletions

File tree

aws/logs_monitoring_go/internal/handling/cloudtrail.go

Lines changed: 57 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"iter"
1515
"log/slog"
1616
"regexp"
17+
"strings"
1718
"sync"
1819
)
1920

@@ -30,77 +31,93 @@ var ec2InstanceRegexp = sync.OnceValue(func() *regexp.Regexp {
3031
return regexp.MustCompile(`^arn:aws:sts::.*?:assumed-role/(?P<role>.*?)/(?P<host>i-([0-9a-f]{8}|[0-9a-f]{17}))$`)
3132
})
3233

33-
func decodeCloudTrail(r io.Reader) iter.Seq2[s3Record, error] {
34-
return func(yield func(s3Record, error) bool) {
34+
func decodeCloudTrail(r io.Reader) iter.Seq2[string, error] {
35+
return func(yield func(string, error) bool) {
3536
gz, err := gzip.NewReader(r)
3637
if err != nil {
37-
yield(s3Record{}, fmt.Errorf("decode cloudtrail gzip: %w", err))
38+
yield("", fmt.Errorf("decode cloudtrail gzip: %w", err))
3839
return
3940
}
4041
defer gz.Close() //nolint:errcheck
4142

4243
dec := json.NewDecoder(gz)
4344

4445
if t, err := dec.Token(); err != nil || t != json.Delim('{') {
45-
yield(s3Record{}, errors.New("decode cloudtrail: expected '{' at start of JSON"))
46+
yield("", errors.New("decode cloudtrail: expected '{' at start of JSON"))
4647
return
4748
}
4849
if t, err := dec.Token(); err != nil || t != "Records" {
49-
yield(s3Record{}, errors.New("decode cloudtrail: expected 'Records' key"))
50+
yield("", errors.New("decode cloudtrail: expected 'Records' key"))
5051
return
5152
}
5253
if t, err := dec.Token(); err != nil || t != json.Delim('[') {
53-
yield(s3Record{}, errors.New("decode cloudtrail: expected '[' at start of Records array"))
54+
yield("", errors.New("decode cloudtrail: expected '[' at start of Records array"))
5455
return
5556
}
5657

5758
for dec.More() {
58-
var record map[string]any
59-
if err := dec.Decode(&record); err != nil {
60-
yield(s3Record{}, fmt.Errorf("decode cloudtrail record: %w", err))
59+
var raw json.RawMessage
60+
if err := dec.Decode(&raw); err != nil {
61+
yield("", fmt.Errorf("decode cloudtrail record: %w", err))
6162
return
6263
}
63-
64-
msg, err := json.Marshal(record)
65-
if err != nil {
66-
yield(s3Record{}, fmt.Errorf("marshal cloudtrail record: %w", err))
67-
return
68-
}
69-
70-
host := cloudtrailHost(record)
71-
if !yield(s3Record{Message: string(msg), Host: host}, nil) {
64+
if !yield(string(raw), nil) {
7265
return
7366
}
7467
}
7568
}
7669
}
7770

78-
func cloudtrailHostFromMessage(message string) string {
79-
var record map[string]any
80-
if err := json.Unmarshal([]byte(message), &record); err != nil {
71+
func cloudtrailHost(message string) string {
72+
dec := json.NewDecoder(strings.NewReader(message))
73+
if t, err := dec.Token(); err != nil || t != json.Delim('{') {
8174
return ""
8275
}
83-
return cloudtrailHost(record)
84-
}
8576

86-
func cloudtrailHost(record map[string]any) string {
87-
ui, ok := record[cloudTrailUserIdentityKey].(map[string]any)
88-
if !ok {
89-
slog.Debug(cloudTrailUserIdentityKey + " key not found, cloudtrail host extraction skipped")
90-
return ""
91-
}
92-
arn, ok := ui[cloudTrailARNKey].(string)
93-
if !ok {
94-
slog.Debug(cloudTrailARNKey + " key not found, cloudtrail host extraction skipped")
95-
return ""
96-
}
77+
for dec.More() {
78+
key, err := dec.Token()
79+
if err != nil {
80+
return ""
81+
}
82+
if key != cloudTrailUserIdentityKey {
83+
var skip json.RawMessage
84+
if err := dec.Decode(&skip); err != nil {
85+
return ""
86+
}
87+
continue
88+
}
9789

98-
re := ec2InstanceRegexp()
99-
matches := re.FindStringSubmatch(arn)
100-
if matches == nil {
101-
slog.Debug(arn + " arn did not match an EC2 host instance, cloudtrail host extraction skipped")
90+
if t, err := dec.Token(); err != nil || t != json.Delim('{') {
91+
return ""
92+
}
93+
94+
for dec.More() {
95+
innerKey, err := dec.Token()
96+
if err != nil {
97+
return ""
98+
}
99+
if innerKey != cloudTrailARNKey {
100+
var skip json.RawMessage
101+
if err := dec.Decode(&skip); err != nil {
102+
return ""
103+
}
104+
continue
105+
}
106+
107+
var arn string
108+
if err := dec.Decode(&arn); err != nil {
109+
return ""
110+
}
111+
112+
re := ec2InstanceRegexp()
113+
matches := re.FindStringSubmatch(arn)
114+
if matches == nil {
115+
slog.Debug(arn + " arn did not match an EC2 host instance, cloudtrail host extraction skipped")
116+
return ""
117+
}
118+
return matches[re.SubexpIndex("host")]
119+
}
102120
return ""
103121
}
104-
105-
return matches[re.SubexpIndex("host")]
122+
return ""
106123
}

aws/logs_monitoring_go/internal/handling/cloudtrail_test.go

Lines changed: 32 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -69,53 +69,44 @@ func TestCloudtrailHost(t *testing.T) {
6969
t.Parallel()
7070

7171
tests := map[string]struct {
72-
record map[string]any
73-
want string
72+
message string
73+
want string
7474
}{
7575
"ec2 instance (17)": {
76-
record: map[string]any{
77-
"userIdentity": map[string]any{
78-
"arn": "arn:aws:sts::601427279990:assumed-role/MyRole/i-08014e4f62ccf762d",
79-
},
80-
},
81-
want: "i-08014e4f62ccf762d",
76+
message: `{"userIdentity":{"arn":"arn:aws:sts::601427279990:assumed-role/MyRole/i-08014e4f62ccf762d"}}`,
77+
want: "i-08014e4f62ccf762d",
8278
},
8379
"ec2 instance (8)": {
84-
record: map[string]any{
85-
"userIdentity": map[string]any{
86-
"arn": "arn:aws:sts::601427279990:assumed-role/MyRole/i-abcd1234",
87-
},
88-
},
89-
want: "i-abcd1234",
80+
message: `{"userIdentity":{"arn":"arn:aws:sts::601427279990:assumed-role/MyRole/i-abcd1234"}}`,
81+
want: "i-abcd1234",
9082
},
9183
"non ec2 arn": {
92-
record: map[string]any{
93-
"userIdentity": map[string]any{
94-
"arn": "arn:aws:sts::601427279990:assumed-role/MyRole/my-session",
95-
},
96-
},
97-
want: "",
84+
message: `{"userIdentity":{"arn":"arn:aws:sts::601427279990:assumed-role/MyRole/my-session"}}`,
85+
want: "",
9886
},
9987
"missing userIdentity": {
100-
record: map[string]any{"eventName": "DescribeTable"},
101-
want: "",
88+
message: `{"eventName":"DescribeTable"}`,
89+
want: "",
10290
},
10391
"missing arn": {
104-
record: map[string]any{
105-
"userIdentity": map[string]any{
106-
"type": "AssumedRole",
107-
},
108-
},
109-
want: "",
92+
message: `{"userIdentity":{"type":"AssumedRole"}}`,
93+
want: "",
94+
},
95+
"invalid json": {
96+
message: "not json",
97+
want: "",
98+
},
99+
"empty message": {
100+
message: "",
101+
want: "",
110102
},
111103
}
112104

113105
for name, tc := range tests {
114106
t.Run(name, func(t *testing.T) {
115107
t.Parallel()
116-
host := cloudtrailHost(tc.record)
117-
if host != tc.want {
118-
t.Errorf("want %q, got %q", tc.want, host)
108+
if got := cloudtrailHost(tc.message); got != tc.want {
109+
t.Errorf("want %q, got %q", tc.want, got)
119110
}
120111
})
121112
}
@@ -126,10 +117,10 @@ func TestDecodeCloudTrail(t *testing.T) {
126117

127118
tests := map[string]struct {
128119
input []byte
129-
want []s3Record
120+
want []string
130121
wantErr bool
131122
}{
132-
"single record with ec2 host": {
123+
"single record": {
133124
input: testutil.MustGzipJSON(t, map[string]any{
134125
"Records": []any{
135126
map[string]any{
@@ -140,29 +131,8 @@ func TestDecodeCloudTrail(t *testing.T) {
140131
},
141132
},
142133
}),
143-
want: []s3Record{
144-
{
145-
Message: `{"eventName":"DescribeTable","userIdentity":{"arn":"arn:aws:sts::601427279990:assumed-role/MyRole/i-08014e4f62ccf762d"}}`,
146-
Host: "i-08014e4f62ccf762d",
147-
},
148-
},
149-
},
150-
"single record without ec2 host": {
151-
input: testutil.MustGzipJSON(t, map[string]any{
152-
"Records": []any{
153-
map[string]any{
154-
"eventName": "DescribeTable",
155-
"userIdentity": map[string]any{
156-
"arn": "arn:aws:iam::601427279990:user/admin",
157-
},
158-
},
159-
},
160-
}),
161-
want: []s3Record{
162-
{
163-
Message: `{"eventName":"DescribeTable","userIdentity":{"arn":"arn:aws:iam::601427279990:user/admin"}}`,
164-
Host: "",
165-
},
134+
want: []string{
135+
`{"eventName":"DescribeTable","userIdentity":{"arn":"arn:aws:sts::601427279990:assumed-role/MyRole/i-08014e4f62ccf762d"}}`,
166136
},
167137
},
168138
"multiple records": {
@@ -172,9 +142,9 @@ func TestDecodeCloudTrail(t *testing.T) {
172142
map[string]any{"eventName": "event2"},
173143
},
174144
}),
175-
want: []s3Record{
176-
{Message: `{"eventName":"event1"}`},
177-
{Message: `{"eventName":"event2"}`},
145+
want: []string{
146+
`{"eventName":"event1"}`,
147+
`{"eventName":"event2"}`,
178148
},
179149
},
180150
"empty records array": {
@@ -197,15 +167,15 @@ func TestDecodeCloudTrail(t *testing.T) {
197167
t.Run(name, func(t *testing.T) {
198168
t.Parallel()
199169

200-
var got []s3Record
201-
for rec, err := range decodeCloudTrail(bytes.NewReader(tc.input)) {
170+
var got []string
171+
for msg, err := range decodeCloudTrail(bytes.NewReader(tc.input)) {
202172
if err != nil {
203173
if !tc.wantErr {
204174
t.Fatalf("unexpected error: %v", err)
205175
}
206176
return
207177
}
208-
got = append(got, rec)
178+
got = append(got, msg)
209179
}
210180

211181
if tc.wantErr {

aws/logs_monitoring_go/internal/handling/cloudwatch.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,8 @@ func (h CloudwatchHandler) newCloudwatchLogEntry(event events.CloudwatchLogsLogE
135135
entry.ID = event.ID
136136
entry.Timestamp = event.Timestamp
137137

138-
if entry.Source == sourceCloudtrail {
139-
if host := cloudtrailHostFromMessage(event.Message); host != "" {
138+
if h.cfg.Host == "" && entry.Source == sourceCloudtrail {
139+
if host := cloudtrailHost(event.Message); host != "" {
140140
entry.Host = host
141141
}
142142
}

aws/logs_monitoring_go/internal/handling/s3.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,6 @@ const (
2828
s3KeyCloudtrail = "_CloudTrail_"
2929
)
3030

31-
type s3Record struct {
32-
Message string
33-
Host string
34-
}
35-
3631
type S3Handler struct {
3732
cfg *config.Config
3833
}
@@ -81,23 +76,27 @@ func (h S3Handler) processRecord(ctx context.Context, client S3APIClient, out ch
8176
}
8277
}()
8378

84-
var records iter.Seq2[s3Record, error]
85-
if cloudTrailRegex().MatchString(key) {
79+
var records iter.Seq2[string, error]
80+
isCloudTrail := cloudTrailRegex().MatchString(key)
81+
if isCloudTrail {
8682
records = decodeCloudTrail(body)
8783
} else {
8884
records = scan(body, h.cfg.S3MultilineLogRegex)
8985
}
9086

91-
for rec, err := range records {
87+
for message, err := range records {
9288
if err != nil {
9389
return err
9490
}
95-
entry := h.newS3LogEntry(eventRecord, rec.Message, lambdaOrigin)
91+
entry := h.newS3LogEntry(eventRecord, message, lambdaOrigin)
9692
if h.cfg.Filter.ShouldExclude(entry.Message) {
9793
continue
9894
}
9995

100-
entry.Host = cmp.Or(h.cfg.Host, rec.Host)
96+
if isCloudTrail {
97+
entry.Host = cloudtrailHost(message)
98+
}
99+
entry.Host = cmp.Or(h.cfg.Host, entry.Host)
101100
entry.Message = h.cfg.Scrubber.Scrub(entry.Message)
102101
if err := concurrent.SafeSender(ctx, out, entry); err != nil {
103102
return err

aws/logs_monitoring_go/internal/handling/scanner.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,17 +80,17 @@ func (s *Scanner) splitOnLines(data []byte, atEOF bool) (advance int, token []by
8080
return 0, nil, nil
8181
}
8282

83-
func scan(r io.Reader, re *regexp.Regexp) iter.Seq2[s3Record, error] {
84-
return func(yield func(s3Record, error) bool) {
83+
func scan(r io.Reader, re *regexp.Regexp) iter.Seq2[string, error] {
84+
return func(yield func(string, error) bool) {
8585
scanner := NewScanner(r, re)
8686
for scanner.Scan() {
8787
message := strings.ToValidUTF8(scanner.Text(), "")
88-
if !yield(s3Record{Message: message}, nil) {
88+
if !yield(message, nil) {
8989
return
9090
}
9191
}
9292
if err := scanner.Err(); err != nil {
93-
yield(s3Record{}, err)
93+
yield("", err)
9494
}
9595
}
9696
}

0 commit comments

Comments
 (0)