Skip to content

Commit 2236c14

Browse files
authored
[AWSINTS-3506] feat(go-forwarder): add S3 pipeline (with multiline regex) (#1108)
* fix Python behavior bug by always splitting on the defined regex
1 parent f6e45c4 commit 2236c14

16 files changed

Lines changed: 894 additions & 25 deletions

File tree

aws/logs_monitoring_go/cmd/forwarder/main.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"log/slog"
1212

1313
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config"
14+
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/forwarding"
1415
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing"
1516
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/pipeline"
1617

@@ -33,7 +34,9 @@ func handleRequest(cfg *config.Config) func(context.Context, json.RawMessage) er
3334
invocationSource := parsing.DetectInvocationSource(event)
3435
switch invocationSource {
3536
case parsing.InvocationSourceCloudwatchLogs:
36-
return pipeline.Run(ctx, event, cfg, parsing.HandleCloudwatchLogs)
37+
return pipeline.Run(ctx, event, cfg, forwarding.CloudwatchStorage, parsing.HandleCloudwatchLogs)
38+
case parsing.InvocationSourceS3:
39+
return pipeline.Run(ctx, event, cfg, forwarding.S3Storage, parsing.HandleS3)
3740
default:
3841
slog.Error("unsupported invocation source", slog.String("source", invocationSource.String()))
3942
return nil

aws/logs_monitoring_go/go.mod

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,19 @@ require (
1111
)
1212

1313
require (
14+
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.8 // indirect
1415
github.com/aws/aws-sdk-go-v2/credentials v1.19.12 // indirect
1516
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.20 // indirect
1617
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.21 // indirect
1718
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.21 // indirect
1819
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.6 // indirect
20+
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.22 // indirect
1921
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.7 // indirect
20-
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.20 // indirect
22+
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.13 // indirect
23+
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.21 // indirect
24+
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.21 // indirect
2125
github.com/aws/aws-sdk-go-v2/service/kms v1.50.4 // indirect
26+
github.com/aws/aws-sdk-go-v2/service/s3 v1.99.0 // indirect
2227
github.com/aws/aws-sdk-go-v2/service/signin v1.0.8 // indirect
2328
github.com/aws/aws-sdk-go-v2/service/ssm v1.68.4 // indirect
2429
github.com/aws/aws-sdk-go-v2/service/sso v1.30.13 // indirect

aws/logs_monitoring_go/go.sum

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ github.com/aws/aws-sdk-go-v2 v1.41.4 h1:10f50G7WyU02T56ox1wWXq+zTX9I1zxG46HYuG1h
44
github.com/aws/aws-sdk-go-v2 v1.41.4/go.mod h1:mwsPRE8ceUUpiTgF7QmQIJ7lgsKUPQOUl3o72QBrE1o=
55
github.com/aws/aws-sdk-go-v2 v1.41.5 h1:dj5kopbwUsVUVFgO4Fi5BIT3t4WyqIDjGKCangnV/yY=
66
github.com/aws/aws-sdk-go-v2 v1.41.5/go.mod h1:mwsPRE8ceUUpiTgF7QmQIJ7lgsKUPQOUl3o72QBrE1o=
7+
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.8 h1:eBMB84YGghSocM7PsjmmPffTa+1FBUeNvGvFou6V/4o=
8+
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.8/go.mod h1:lyw7GFp3qENLh7kwzf7iMzAxDn+NzjXEAGjKS2UOKqI=
79
github.com/aws/aws-sdk-go-v2/config v1.32.12 h1:O3csC7HUGn2895eNrLytOJQdoL2xyJy0iYXhoZ1OmP0=
810
github.com/aws/aws-sdk-go-v2/config v1.32.12/go.mod h1:96zTvoOFR4FURjI+/5wY1vc1ABceROO4lWgWJuxgy0g=
911
github.com/aws/aws-sdk-go-v2/credentials v1.19.12 h1:oqtA6v+y5fZg//tcTWahyN9PEn5eDU/Wpvc2+kJ4aY8=
@@ -20,12 +22,22 @@ github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.21 h1:PEgGVtPoB6NTpPrBgq
2022
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.21/go.mod h1:p+hz+PRAYlY3zcpJhPwXlLC4C+kqn70WIHwnzAfs6ps=
2123
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.6 h1:qYQ4pzQ2Oz6WpQ8T3HvGHnZydA72MnLuFK9tJwmrbHw=
2224
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.6/go.mod h1:O3h0IK87yXci+kg6flUKzJnWeziQUKciKrLjcatSNcY=
25+
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.22 h1:rWyie/PxDRIdhNf4DzRk0lvjVOqFJuNnO8WwaIRVxzQ=
26+
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.22/go.mod h1:zd/JsJ4P7oGfUhXn1VyLqaRZwPmZwg44Jf2dS84Dm3Y=
2327
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.7 h1:5EniKhLZe4xzL7a+fU3C2tfUN4nWIqlLesfrjkuPFTY=
2428
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.7/go.mod h1:x0nZssQ3qZSnIcePWLvcoFisRXJzcTVvYpAAdYX8+GI=
29+
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.13 h1:JRaIgADQS/U6uXDqlPiefP32yXTda7Kqfx+LgspooZM=
30+
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.13/go.mod h1:CEuVn5WqOMilYl+tbccq8+N2ieCy0gVn3OtRb0vBNNM=
2531
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.20 h1:2HvVAIq+YqgGotK6EkMf+KIEqTISmTYh5zLpYyeTo1Y=
2632
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.20/go.mod h1:V4X406Y666khGa8ghKmphma/7C0DAtEQYhkq9z4vpbk=
33+
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.21 h1:c31//R3xgIJMSC8S6hEVq+38DcvUlgFY0FM6mSI5oto=
34+
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.21/go.mod h1:r6+pf23ouCB718FUxaqzZdbpYFyDtehyZcmP5KL9FkA=
35+
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.21 h1:ZlvrNcHSFFWURB8avufQq9gFsheUgjVD9536obIknfM=
36+
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.21/go.mod h1:cv3TNhVrssKR0O/xxLJVRfd2oazSnZnkUeTf6ctUwfQ=
2737
github.com/aws/aws-sdk-go-v2/service/kms v1.50.4 h1:PgD1y0ZagPokGIZPmejCBUySBzOFDN+leZxCOfb1OEQ=
2838
github.com/aws/aws-sdk-go-v2/service/kms v1.50.4/go.mod h1:FfXDb5nXrsoGgxsBFxwxr3vdHXheC2tV+6lmuLghhjQ=
39+
github.com/aws/aws-sdk-go-v2/service/s3 v1.99.0 h1:hlSuz394kV0vhv9drL5lhuEFbEOEP1VyQpy15qWh1Pk=
40+
github.com/aws/aws-sdk-go-v2/service/s3 v1.99.0/go.mod h1:uoA43SdFwacedBfSgfFSjjCvYe8aYBS7EnU5GZ/YKMM=
2941
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.41.4 h1:9aZbO86sraeCIHHCpZhxwN9tnVy9POkSKzi4/TpT54A=
3042
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.41.4/go.mod h1:cxiXDhEzIq7Xx1BtmC4lGBK3SwAZ79+EUWiKawYHo14=
3143
github.com/aws/aws-sdk-go-v2/service/signin v1.0.8 h1:0GFOLzEbOyZABS3PhYfBIx2rNBACYcKty+XGkTgw1ow=

aws/logs_monitoring_go/internal/config/config.go

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,22 +9,24 @@ import (
99
"context"
1010
"fmt"
1111
"log/slog"
12+
"regexp"
1213
)
1314

1415
var ForwarderVersion = "6.0"
1516

1617
type Config struct {
17-
APIKey string
18-
Site string
19-
IntakeURL string
20-
APIURL string
21-
LogLevel string
22-
UseFIPS bool
23-
Source string
24-
Host string
25-
CustomTags string
26-
Scrubbing ScrubbingConfig
27-
Filtering FilteringConfig
18+
APIKey string
19+
Site string
20+
IntakeURL string
21+
APIURL string
22+
LogLevel string
23+
UseFIPS bool
24+
Source string
25+
Host string
26+
CustomTags string
27+
Scrubbing ScrubbingConfig
28+
Filtering FilteringConfig
29+
S3MultilineLogRegex *regexp.Regexp
2830
}
2931

3032
type ScrubbingConfig struct {
@@ -58,7 +60,9 @@ func Load(ctx context.Context) (*Config, error) {
5860
}
5961

6062
func loadConfig() *Config {
63+
S3MultilineLogRegex := loadS3MultilineLogRegex()
6164
site := envOrDefault("DD_SITE", "datadoghq.com")
65+
6266
return &Config{
6367
Site: site,
6468
IntakeURL: envOrDefault("DD_URL", "https://http-intake.logs."+site+"/api/v2/logs"),
@@ -78,7 +82,23 @@ func loadConfig() *Config {
7882
IncludePattern: envOrDefault("INCLUDE_AT_MATCH", ""),
7983
ExcludePattern: envOrDefault("EXCLUDE_AT_MATCH", ""),
8084
},
85+
S3MultilineLogRegex: S3MultilineLogRegex,
86+
}
87+
}
88+
89+
func loadS3MultilineLogRegex() *regexp.Regexp {
90+
pattern := envOrDefault("DD_MULTILINE_LOG_REGEX_PATTERN", "")
91+
if pattern == "" {
92+
return nil
8193
}
94+
95+
re, err := regexp.Compile(pattern)
96+
if err != nil {
97+
slog.Error("invalid multiline log pattern", slog.String("pattern", pattern), slog.Any("error", err))
98+
return nil
99+
}
100+
101+
return re
82102
}
83103

84104
func (c Config) LogValue() slog.Value {
@@ -93,5 +113,6 @@ func (c Config) LogValue() slog.Value {
93113
slog.Bool("customScrubbing", c.Scrubbing.CustomRule != ""),
94114
slog.Bool("includeFilter", c.Filtering.IncludePattern != ""),
95115
slog.Bool("excludeFilter", c.Filtering.ExcludePattern != ""),
116+
slog.Bool("multilineRegex", c.S3MultilineLogRegex != nil),
96117
)
97118
}

aws/logs_monitoring_go/internal/config/config_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,3 +120,43 @@ func TestLoadConfig(t *testing.T) {
120120
})
121121
}
122122
}
123+
124+
func TestLoadS3MultilineLogRegex(t *testing.T) {
125+
tests := map[string]struct {
126+
env string
127+
wantNil bool
128+
}{
129+
"empty_pattern_returns_nil": {
130+
env: "",
131+
wantNil: true,
132+
},
133+
"valid_pattern_returns_compiled_regex": {
134+
env: `\d{4}-\d{2}-\d{2}`,
135+
},
136+
"invalid_pattern_returns_nil": {
137+
env: `[invalid`,
138+
wantNil: true,
139+
},
140+
}
141+
142+
for name, tc := range tests {
143+
t.Run(name, func(t *testing.T) {
144+
if tc.env != "" {
145+
t.Setenv("DD_MULTILINE_LOG_REGEX_PATTERN", tc.env)
146+
}
147+
148+
got := loadS3MultilineLogRegex()
149+
150+
if tc.wantNil {
151+
if got != nil {
152+
t.Fatalf("want nil, got `%v", got)
153+
}
154+
return
155+
}
156+
157+
if got == nil {
158+
t.Fatal("want non-nil, got nil")
159+
}
160+
})
161+
}
162+
}

aws/logs_monitoring_go/internal/forwarding/forwarding.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,23 @@ import (
1919
"golang.org/x/sync/errgroup"
2020
)
2121

22-
const numWorkers = 3
22+
const (
23+
numWorkers = 3
24+
CloudwatchStorage = "cloudwatch"
25+
S3Storage = "s3"
26+
)
2327

2428
type Forwarder struct {
25-
config *config.Config
26-
client *http.Client
29+
config *config.Config
30+
client *http.Client
31+
storage string
2732
}
2833

29-
func NewForwarder(config *config.Config, client *http.Client) Forwarder {
34+
func NewForwarder(config *config.Config, client *http.Client, storage string) Forwarder {
3035
return Forwarder{
31-
config: config,
32-
client: client,
36+
config: config,
37+
client: client,
38+
storage: storage,
3339
}
3440
}
3541

@@ -78,7 +84,7 @@ func (f Forwarder) send(ctx context.Context, body []byte) error {
7884
req.Header.Set("Content-Encoding", "gzip")
7985
req.Header.Set("DD-EVP-ORIGIN", "aws_forwarder")
8086
req.Header.Set("DD-EVP-ORIGIN-VERSION", config.ForwarderVersion)
81-
req.Header.Set("DD-STORAGE-TAG", "cloudwatch")
87+
req.Header.Set("DD-STORAGE-TAG", f.storage)
8288

8389
resp, err := f.client.Do(req)
8490
if err != nil {

aws/logs_monitoring_go/internal/forwarding/forwarding_test.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ func TestForward(t *testing.T) {
2323

2424
tests := map[string]struct {
2525
statusCode int
26+
storage string
2627
payloads [][]byte
2728
cancelCtx bool
2829
wantErr bool
@@ -31,40 +32,52 @@ func TestForward(t *testing.T) {
3132
}{
3233
"single_message_accepted": {
3334
statusCode: http.StatusAccepted,
35+
storage: CloudwatchStorage,
3436
payloads: [][]byte{[]byte("test payload")},
3537
wantCalls: 1,
3638
},
3739
"multiple_messages_accepted": {
3840
statusCode: http.StatusAccepted,
41+
storage: CloudwatchStorage,
3942
payloads: [][]byte{[]byte("first"), []byte("second"), []byte("third")},
4043
wantCalls: 3,
4144
},
4245
"empty_channel": {
4346
statusCode: http.StatusAccepted,
47+
storage: CloudwatchStorage,
4448
payloads: [][]byte{},
4549
wantCalls: 0,
4650
},
4751
"server_returns_400": {
4852
statusCode: http.StatusBadRequest,
53+
storage: CloudwatchStorage,
4954
payloads: [][]byte{[]byte("test payload")},
5055
wantErr: true,
5156
wantErrMsg: "unexpected status from intake",
5257
wantCalls: 1,
5358
},
5459
"server_returns_500": {
5560
statusCode: http.StatusInternalServerError,
61+
storage: CloudwatchStorage,
5662
payloads: [][]byte{[]byte("test payload")},
5763
wantErr: true,
5864
wantErrMsg: "unexpected status from intake",
5965
wantCalls: 1,
6066
},
6167
"context_cancelled": {
6268
statusCode: http.StatusAccepted,
69+
storage: CloudwatchStorage,
6370
payloads: [][]byte{[]byte("test payload")},
6471
cancelCtx: true,
6572
wantErr: true,
6673
wantCalls: 0,
6774
},
75+
"s3_storage": {
76+
statusCode: http.StatusAccepted,
77+
storage: S3Storage,
78+
payloads: [][]byte{[]byte("test payload")},
79+
wantCalls: 1,
80+
},
6881
}
6982

7083
for name, tc := range tests {
@@ -91,6 +104,9 @@ func TestForward(t *testing.T) {
91104
if got := req.Header.Get("DD-EVP-ORIGIN-VERSION"); got != config.ForwarderVersion {
92105
t.Errorf("DD-EVP-ORIGIN-VERSION = %q, want %q", got, config.ForwarderVersion)
93106
}
107+
if got := req.Header.Get("DD-STORAGE-TAG"); got != tc.storage {
108+
t.Errorf("DD-STORAGE-TAG = %q, want %q", got, tc.storage)
109+
}
94110

95111
gr, err := gzip.NewReader(req.Body)
96112
if err != nil {
@@ -111,10 +127,7 @@ func TestForward(t *testing.T) {
111127
}))
112128
defer server.Close()
113129

114-
f := NewForwarder(&config.Config{
115-
IntakeURL: server.URL,
116-
APIKey: "test-api-key",
117-
}, server.Client())
130+
f := NewForwarder(&config.Config{IntakeURL: server.URL, APIKey: "test-api-key"}, server.Client(), tc.storage)
118131

119132
ctx, cancel := context.WithCancel(context.Background())
120133
defer cancel()
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2026-Present Datadog, Inc.
5+
6+
package model
7+
8+
type S3LogEntry struct {
9+
Message string `json:"message"`
10+
Source string `json:"ddsource"`
11+
SourceCategory string `json:"ddsourcecategory"`
12+
Service string `json:"service"`
13+
Tags Tags `json:"ddtags"`
14+
Metadata S3Metadata `json:"aws"`
15+
}
16+
17+
type S3Metadata struct {
18+
Metadata
19+
S3Context S3Context `json:"s3"`
20+
}
21+
22+
type S3Context struct {
23+
Bucket string `json:"bucket"`
24+
Key string `json:"key"`
25+
}

0 commit comments

Comments
 (0)