Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 38 additions & 31 deletions aws/logs_monitoring_go/internal/handling/cloudtrail.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,7 @@ import (
"fmt"
"io"
"iter"
"log/slog"
"regexp"
"strings"

"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing"
)

var (
Expand All @@ -25,15 +21,40 @@ var (
func decodeCloudTrail(r io.Reader) iter.Seq2[string, error] {
return func(yield func(string, error) bool) {
dec := json.NewDecoder(r)
if err := parsing.SkipToRecords(dec); err != nil {
yield("", fmt.Errorf("cloudtrail: %w", err))
t, err := dec.Token()
if err != nil {
yield("", err)
return
}
if t != json.Delim('{') {
yield("", fmt.Errorf(`expected "{" token, got %q`, t))
return
}

t, err = dec.Token()
if err != nil {
yield("", err)
return
}
if t != "Records" {
yield("", fmt.Errorf(`expected "Records" token, got %q`, t))
return
}

t, err = dec.Token()
if err != nil {
yield("", err)
return
}
if t != json.Delim('[') {
yield("", fmt.Errorf(`expected "[" token, got %q`, t))
return
}

for dec.More() {
var raw json.RawMessage
if err := dec.Decode(&raw); err != nil {
yield("", fmt.Errorf("decode cloudtrail record: %w", err))
yield("", fmt.Errorf(`decode: %w`, err))
return
}
if !yield(string(raw), nil) {
Expand All @@ -43,33 +64,19 @@ func decodeCloudTrail(r io.Reader) iter.Seq2[string, error] {
}
}

func cloudtrailHost(message string) (host string) {
dec := json.NewDecoder(strings.NewReader(message))
if err := parsing.SkipBrace(dec); err != nil {
return
}

if err := parsing.SkipToKey(dec, "userIdentity"); err != nil {
return
func cloudtrailHost(message string) string {
var record struct {
UserIdentity struct {
ARN string `json:"arn"`
} `json:"userIdentity"`
}

if err := parsing.SkipBrace(dec); err != nil {
return
}

if err := parsing.SkipToKey(dec, "arn"); err != nil {
return
}

var arn string
if err := dec.Decode(&arn); err != nil {
return
if err := json.Unmarshal([]byte(message), &record); err != nil {
return ""
}

matches := ec2InstanceRegexp.FindStringSubmatch(arn)
matches := ec2InstanceRegexp.FindStringSubmatch(record.UserIdentity.ARN)
if matches != nil {
host = matches[ec2InstanceRegexp.SubexpIndex("host")]
slog.Debug("ec2 host found in userIdentity.arn")
return matches[ec2InstanceRegexp.SubexpIndex("host")]
}
return
return ""
}
21 changes: 9 additions & 12 deletions aws/logs_monitoring_go/internal/handling/eventbridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,16 @@
package handling

import (
"bytes"
"cmp"
"context"
"encoding/json"
"errors"
"fmt"
"strings"

"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/filtering"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/scrubbing"
)

Expand All @@ -42,7 +41,7 @@ func (h *eventBridgeHandler) Handle(ctx context.Context, event json.RawMessage,

source, err := eventBridgeSource(event)
if err != nil {
return fmt.Errorf("source: %w", err)
return err
}

switch source {
Expand Down Expand Up @@ -97,21 +96,19 @@ func (h *eventBridgeHandler) newEntry(source string, lambdaOrigin model.LambdaOr
}

func eventBridgeSource(event json.RawMessage) (string, error) {
dec := json.NewDecoder(bytes.NewReader(event))
if err := parsing.SkipBrace(dec); err != nil {
return "", err
var s struct {
Source string `json:"source"`
}

if err := parsing.SkipToKey(dec, "source"); err != nil {
return "", err
if err := json.Unmarshal(event, &s); err != nil {
return "", fmt.Errorf("unmarshal: %w", err)
}

var rawSource string
if err := dec.Decode(&rawSource); err != nil {
return "", fmt.Errorf("decode: %w", err)
if s.Source == "" {
return "", errors.New("eventbridge source not found")
}

_, source, found := strings.Cut(rawSource, ".")
_, source, found := strings.Cut(s.Source, ".")
if found {
return source, nil
}
Expand Down
84 changes: 84 additions & 0 deletions aws/logs_monitoring_go/internal/parsing/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2026-Present Datadog, Inc.

package parsing

import (
"encoding/json"
"testing"
)

var (
benchCloudWatch = json.RawMessage(`{"awslogs":{"data":"dGVzdA=="}}`)
benchS3 = json.RawMessage(`{"Records":[{"eventSource":"aws:s3","s3":{"bucket":{"name":"b"},"object":{"key":"k"}}}]}`)
benchSNSS3 = json.RawMessage(`{"Records":[{"EventSource":"aws:sns","Sns":{"Type":"Notification","Message":"{\"Records\":[{\"eventSource\":\"aws:s3\",\"s3\":{\"bucket\":{\"name\":\"b\"},\"object\":{\"key\":\"k\"}}}]}"}}]}`)
benchSQSS3 = json.RawMessage(`{"Records":[` +
`{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b1\"},\"object\":{\"key\":\"k1\"}}}]}"},` +
`{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b2\"},\"object\":{\"key\":\"k2\"}}}]}"},` +
`{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b3\"},\"object\":{\"key\":\"k3\"}}}]}"},` +
`{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b4\"},\"object\":{\"key\":\"k4\"}}}]}"},` +
`{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b5\"},\"object\":{\"key\":\"k5\"}}}]}"},` +
`{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b6\"},\"object\":{\"key\":\"k6\"}}}]}"},` +
`{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b7\"},\"object\":{\"key\":\"k7\"}}}]}"},` +
`{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b8\"},\"object\":{\"key\":\"k8\"}}}]}"},` +
`{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b9\"},\"object\":{\"key\":\"k9\"}}}]}"},` +
`{"eventSource":"aws:sqs","body":"{\"Records\":[{\"s3\":{\"bucket\":{\"name\":\"b10\"},\"object\":{\"key\":\"k10\"}}}]}"}` +
`]}`)
benchSQSSNSS3 = json.RawMessage(`{"Records":[` +
`{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventSource\\\":\\\"aws:s3\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b1\\\"},\\\"object\\\":{\\\"key\\\":\\\"k1\\\"}}}]}\"}"},` +
`{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventSource\\\":\\\"aws:s3\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b2\\\"},\\\"object\\\":{\\\"key\\\":\\\"k2\\\"}}}]}\"}"},` +
`{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventSource\\\":\\\"aws:s3\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b3\\\"},\\\"object\\\":{\\\"key\\\":\\\"k3\\\"}}}]}\"}"},` +
`{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventSource\\\":\\\"aws:s3\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b4\\\"},\\\"object\\\":{\\\"key\\\":\\\"k4\\\"}}}]}\"}"},` +
`{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventSource\\\":\\\"aws:s3\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b5\\\"},\\\"object\\\":{\\\"key\\\":\\\"k5\\\"}}}]}\"}"},` +
`{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventSource\\\":\\\"aws:s3\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b6\\\"},\\\"object\\\":{\\\"key\\\":\\\"k6\\\"}}}]}\"}"},` +
`{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventSource\\\":\\\"aws:s3\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b7\\\"},\\\"object\\\":{\\\"key\\\":\\\"k7\\\"}}}]}\"}"},` +
`{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventSource\\\":\\\"aws:s3\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b8\\\"},\\\"object\\\":{\\\"key\\\":\\\"k8\\\"}}}]}\"}"},` +
`{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventSource\\\":\\\"aws:s3\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b9\\\"},\\\"object\\\":{\\\"key\\\":\\\"k9\\\"}}}]}\"}"},` +
`{"eventSource":"aws:sqs","body":"{\"Type\":\"Notification\",\"Message\":\"{\\\"Records\\\":[{\\\"eventSource\\\":\\\"aws:s3\\\",\\\"s3\\\":{\\\"bucket\\\":{\\\"name\\\":\\\"b10\\\"},\\\"object\\\":{\\\"key\\\":\\\"k10\\\"}}}]}\"}"}` +
`]}`)
benchEventBridge = json.RawMessage(`{"version":"0","detail-type":"Object Created","source":"aws.s3","detail":{"bucket":{"name":"my-bucket"},"object":{"key":"my-key"}}}`)
)

func BenchmarkParse_CloudWatch(b *testing.B) {
b.ReportAllocs()
for b.Loop() {
_, _ = Parse(benchCloudWatch)
}
}

func BenchmarkParse_S3(b *testing.B) {
b.ReportAllocs()
for b.Loop() {
_, _ = Parse(benchS3)
}
}

func BenchmarkParse_SNS_S3(b *testing.B) {
b.ReportAllocs()
for b.Loop() {
_, _ = Parse(benchSNSS3)
}
}

func BenchmarkParse_SQS_S3(b *testing.B) {
b.ReportAllocs()
for b.Loop() {
_, _ = Parse(benchSQSS3)
}
}

func BenchmarkParse_SQS_SNS_S3(b *testing.B) {
b.ReportAllocs()
for b.Loop() {
_, _ = Parse(benchSQSSNSS3)
}
}

func BenchmarkParse_EventBridge(b *testing.B) {
b.ReportAllocs()
for b.Loop() {
_, _ = Parse(benchEventBridge)
}
}
7 changes: 4 additions & 3 deletions aws/logs_monitoring_go/internal/parsing/content.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ const (
ContentTypeRetry
)

type ParsedEvent struct {
ContentType ContentType
Payload json.RawMessage
type Event struct {
ContentType ContentType
Payload json.RawMessage
SQSReceiptHandle string
}
26 changes: 0 additions & 26 deletions aws/logs_monitoring_go/internal/parsing/error.go

This file was deleted.

Loading
Loading