Skip to content

Commit 10a27da

Browse files
committed
[AWSINTS-3515] feat(go-forwarder): add SNS S3 wrapper
1 parent 076c63b commit 10a27da

5 files changed

Lines changed: 341 additions & 30 deletions

File tree

aws/logs_monitoring_go/cmd/forwarder/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ func handleRequest(cfg *config.Config) func(context.Context, json.RawMessage) er
3737
return pipeline.Run(ctx, event, cfg, forwarding.CloudwatchStorage, parsing.HandleCloudwatchLogs)
3838
case parsing.InvocationSourceKinesis:
3939
return pipeline.Run(ctx, event, cfg, forwarding.CloudwatchStorage, parsing.HandleKinesis)
40+
case parsing.InvocationSourceSNS:
41+
return pipeline.Run(ctx, event, cfg, forwarding.S3Storage, parsing.HandleSNS)
4042
case parsing.InvocationSourceS3:
4143
return pipeline.Run(ctx, event, cfg, forwarding.S3Storage, parsing.HandleS3)
4244
default:

aws/logs_monitoring_go/internal/parsing/s3.go

Lines changed: 39 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@
66
package parsing
77

88
import (
9+
"cmp"
910
"context"
1011
"encoding/json"
1112
"fmt"
1213
"log/slog"
1314
"regexp"
15+
"slices"
1416
"strings"
1517

1618
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent"
@@ -19,7 +21,7 @@ import (
1921
"github.com/aws/aws-lambda-go/events"
2022
)
2123

22-
type s3RecordContext struct {
24+
type s3Record struct {
2325
metadata model.Metadata
2426
tags model.Tags
2527
source string
@@ -30,19 +32,31 @@ type s3RecordContext struct {
3032
}
3133

3234
func HandleS3(ctx context.Context, event json.RawMessage, cfg *config.Config, out chan<- model.S3LogEntry) error {
33-
var s3Event events.S3Event
34-
if err := json.Unmarshal(event, &s3Event); err != nil {
35-
return fmt.Errorf("unmarshal: %w", err)
35+
client, metadata, err := setupS3(ctx, cfg)
36+
if err != nil {
37+
return fmt.Errorf("setup s3: %w", err)
3638
}
39+
return handleS3Event(ctx, event, cfg, client, metadata, out)
40+
}
3741

42+
func setupS3(ctx context.Context, cfg *config.Config) (S3APIClient, model.Metadata, error) {
3843
client, err := createS3APIClient(ctx, cfg.UseFIPS)
3944
if err != nil {
40-
return fmt.Errorf("create S3 client: %w", err)
45+
return nil, model.Metadata{}, fmt.Errorf("create S3 client: %w", err)
4146
}
4247

43-
forwarderMetadata, err := model.GetMetadata(ctx)
48+
metadata, err := model.GetMetadata(ctx)
4449
if err != nil {
45-
return err
50+
return nil, model.Metadata{}, fmt.Errorf("get metadata: %w", err)
51+
}
52+
53+
return client, metadata, nil
54+
}
55+
56+
func handleS3Event(ctx context.Context, event json.RawMessage, cfg *config.Config, client S3APIClient, metadata model.Metadata, out chan<- model.S3LogEntry) error {
57+
var s3Event events.S3Event
58+
if err := json.Unmarshal(event, &s3Event); err != nil {
59+
return fmt.Errorf("unmarshal: %w", err)
4660
}
4761

4862
for _, record := range s3Event.Records {
@@ -51,30 +65,36 @@ func HandleS3(ctx context.Context, event json.RawMessage, cfg *config.Config, ou
5165

5266
tags, service := getTagsAndService(cfg)
5367
source := getS3Source(cfg.Source, key)
54-
if service == "" {
55-
service = source
56-
}
57-
58-
rc := s3RecordContext{
59-
forwarderMetadata, tags, source, service, bucket, key, cfg.S3MultilineLogRegex,
68+
service = cmp.Or(service, source)
69+
70+
rc := s3Record{
71+
metadata: metadata,
72+
tags: tags,
73+
source: source,
74+
service: service,
75+
bucket: bucket,
76+
key: key,
77+
multilineRegex: cfg.S3MultilineLogRegex,
6078
}
6179
if err := processS3Record(ctx, client, out, rc); err != nil {
62-
return fmt.Errorf("process S3 record: %w", err)
80+
slog.WarnContext(ctx, "skipping s3 record",
81+
"bucket", bucket, "key", key, "error", err)
82+
continue
6383
}
6484
}
6585

6686
return nil
6787
}
6888

69-
func processS3Record(ctx context.Context, client S3APIClient, out chan<- model.S3LogEntry, rc s3RecordContext) error {
89+
func processS3Record(ctx context.Context, client S3APIClient, out chan<- model.S3LogEntry, rc s3Record) error {
7090
body, err := getS3Object(ctx, client, rc.bucket, rc.key)
7191
if err != nil {
7292
return err
7393
}
7494

7595
defer func() {
7696
if err := body.Close(); err != nil {
77-
slog.Warn("failed to close response body", slog.Any("error", err))
97+
slog.WarnContext(ctx, "failed to close response body", slog.Any("error", err))
7898
}
7999
}()
80100

@@ -93,15 +113,15 @@ func processS3Record(ctx context.Context, client S3APIClient, out chan<- model.S
93113
return nil
94114
}
95115

96-
func makeS3Entry(rc s3RecordContext, message string) model.S3LogEntry {
116+
func makeS3Entry(rc s3Record, message string) model.S3LogEntry {
97117
ddtags, ddtagsService, message := extractFromMessage(message)
98118

99119
entryService := rc.service
100120
if ddtagsService != "" {
101121
entryService = ddtagsService
102122
}
103123

104-
ddtags = append(ddtags, "service:"+entryService)
124+
tags := slices.Concat(ddtags, model.Tags{"service:" + entryService}, rc.tags)
105125
metadata := model.S3Metadata{
106126
Metadata: rc.metadata,
107127
S3Context: model.S3Context{
@@ -115,7 +135,7 @@ func makeS3Entry(rc s3RecordContext, message string) model.S3LogEntry {
115135
Source: rc.source,
116136
SourceCategory: sourceCategory,
117137
Service: entryService,
118-
Tags: append(ddtags, rc.tags...),
138+
Tags: tags,
119139
Metadata: metadata,
120140
}
121141
}

aws/logs_monitoring_go/internal/parsing/s3_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func TestProcessS3Record(t *testing.T) {
2323
tests := map[string]struct {
2424
mockSetup func(m *MockS3APIClient)
2525
chanSize int
26-
rc s3RecordContext
26+
rc s3Record
2727
want []model.S3LogEntry
2828
wantErr bool
2929
}{
@@ -35,7 +35,7 @@ func TestProcessS3Record(t *testing.T) {
3535
}, nil)
3636
},
3737
chanSize: 1,
38-
rc: s3RecordContext{tags: model.Tags{}, source: "s3", service: "s3", bucket: "b", key: "k"},
38+
rc: s3Record{tags: model.Tags{}, source: "s3", service: "s3", bucket: "b", key: "k"},
3939
want: []model.S3LogEntry{{
4040
Message: "line1",
4141
Source: "s3",
@@ -55,7 +55,7 @@ func TestProcessS3Record(t *testing.T) {
5555
}, nil)
5656
},
5757
chanSize: 3,
58-
rc: s3RecordContext{tags: model.Tags{}, source: "s3", service: "s3", bucket: "b", key: "k"},
58+
rc: s3Record{tags: model.Tags{}, source: "s3", service: "s3", bucket: "b", key: "k"},
5959
want: []model.S3LogEntry{
6060
{Message: "line1", Source: "s3", SourceCategory: "aws", Service: "s3", Tags: model.Tags{"service:s3"}, Metadata: model.S3Metadata{S3Context: model.S3Context{Bucket: "b", Key: "k"}}},
6161
{Message: "line2", Source: "s3", SourceCategory: "aws", Service: "s3", Tags: model.Tags{"service:s3"}, Metadata: model.S3Metadata{S3Context: model.S3Context{Bucket: "b", Key: "k"}}},
@@ -70,7 +70,7 @@ func TestProcessS3Record(t *testing.T) {
7070
}, nil)
7171
},
7272
chanSize: 0,
73-
rc: s3RecordContext{tags: model.Tags{}, source: "s3", service: "s3", bucket: "b", key: "k"},
73+
rc: s3Record{tags: model.Tags{}, source: "s3", service: "s3", bucket: "b", key: "k"},
7474
want: nil,
7575
},
7676
"s3_error": {
@@ -79,7 +79,7 @@ func TestProcessS3Record(t *testing.T) {
7979
Return(nil, errors.New("access denied"))
8080
},
8181
chanSize: 1,
82-
rc: s3RecordContext{bucket: "b", key: "k"},
82+
rc: s3Record{bucket: "b", key: "k"},
8383
wantErr: true,
8484
},
8585
"ddtags_extraction": {
@@ -90,7 +90,7 @@ func TestProcessS3Record(t *testing.T) {
9090
}, nil)
9191
},
9292
chanSize: 1,
93-
rc: s3RecordContext{tags: model.Tags{}, source: "s3", service: "s3", bucket: "b", key: "k"},
93+
rc: s3Record{tags: model.Tags{}, source: "s3", service: "s3", bucket: "b", key: "k"},
9494
want: []model.S3LogEntry{{
9595
Message: `{"msg":"hello"}`,
9696
Source: "s3",
@@ -108,7 +108,7 @@ func TestProcessS3Record(t *testing.T) {
108108
}, nil)
109109
},
110110
chanSize: 1,
111-
rc: s3RecordContext{tags: model.Tags{}, source: "s3", service: "s3", bucket: "b", key: "k"},
111+
rc: s3Record{tags: model.Tags{}, source: "s3", service: "s3", bucket: "b", key: "k"},
112112
want: []model.S3LogEntry{{
113113
Message: "helloworld",
114114
Source: "s3",
@@ -126,7 +126,7 @@ func TestProcessS3Record(t *testing.T) {
126126
}, nil)
127127
},
128128
chanSize: 2,
129-
rc: s3RecordContext{
129+
rc: s3Record{
130130
tags: model.Tags{}, source: "s3", service: "s3", bucket: "b", key: "k",
131131
multilineRegex: regexp.MustCompile(`\d{4}-\d{2}-\d{2}`),
132132
},
@@ -143,7 +143,7 @@ func TestProcessS3Record(t *testing.T) {
143143
}, nil)
144144
},
145145
chanSize: 1,
146-
rc: s3RecordContext{
146+
rc: s3Record{
147147
tags: model.Tags{}, source: "s3", service: "s3", bucket: "b", key: "k",
148148
multilineRegex: regexp.MustCompile(`\d{4}-\d{2}-\d{2}`),
149149
},
@@ -164,7 +164,7 @@ func TestProcessS3Record(t *testing.T) {
164164
}, nil)
165165
},
166166
chanSize: 3,
167-
rc: s3RecordContext{
167+
rc: s3Record{
168168
tags: model.Tags{}, source: "s3", service: "s3", bucket: "b", key: "k",
169169
multilineRegex: regexp.MustCompile(`\d{4}-\d{2}-\d{2}`),
170170
},
@@ -182,7 +182,7 @@ func TestProcessS3Record(t *testing.T) {
182182
}, nil)
183183
},
184184
chanSize: 1,
185-
rc: s3RecordContext{tags: model.Tags{"env:prod", "team:aws"}, source: "s3", service: "s3", bucket: "b", key: "k"},
185+
rc: s3Record{tags: model.Tags{"env:prod", "team:aws"}, source: "s3", service: "s3", bucket: "b", key: "k"},
186186
want: []model.S3LogEntry{{
187187
Message: "line1",
188188
Source: "s3",
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2026-Present Datadog, Inc.
5+
6+
package parsing
7+
8+
import (
9+
"context"
10+
"encoding/json"
11+
"fmt"
12+
"log/slog"
13+
14+
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config"
15+
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model"
16+
"github.com/aws/aws-lambda-go/events"
17+
)
18+
19+
func HandleSNS(ctx context.Context, event json.RawMessage, cfg *config.Config, out chan<- model.S3LogEntry) error {
20+
var snsEvent events.SNSEvent
21+
if err := json.Unmarshal(event, &snsEvent); err != nil {
22+
return fmt.Errorf("unmarshal: %w", err)
23+
}
24+
25+
client, metadata, err := setupS3(ctx, cfg)
26+
if err != nil {
27+
return fmt.Errorf("setup s3 for sns: %w", err)
28+
}
29+
30+
for _, record := range snsEvent.Records {
31+
if err := handleSNSRecord(ctx, record, cfg, client, metadata, out); err != nil {
32+
slog.WarnContext(ctx, "skipping sns record", "error", err)
33+
continue
34+
}
35+
}
36+
return nil
37+
}
38+
39+
func handleSNSRecord(ctx context.Context, record events.SNSEventRecord, cfg *config.Config, client S3APIClient, metadata model.Metadata, out chan<- model.S3LogEntry) error {
40+
s3Raw := json.RawMessage(record.SNS.Message)
41+
return handleS3Event(ctx, s3Raw, cfg, client, metadata, out)
42+
}

0 commit comments

Comments
 (0)