Skip to content

Commit 3daa8d9

Browse files
committed
fix(go-forwarder): add scanner to handle delimiters
1 parent 25b5d71 commit 3daa8d9

7 files changed

Lines changed: 102 additions & 48 deletions

File tree

aws/logs_monitoring_go/internal/config/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,6 @@ func (c Config) LogValue() slog.Value {
113113
slog.Bool("customScrubbing", c.Scrubbing.CustomRule != ""),
114114
slog.Bool("includeFilter", c.Filtering.IncludePattern != ""),
115115
slog.Bool("excludeFilter", c.Filtering.ExcludePattern != ""),
116-
slog.Bool("multilineRegex", c.S3MultilineLogRegex.String() != ""),
116+
slog.Bool("multilineRegex", c.S3MultilineLogRegex != nil),
117117
)
118118
}

aws/logs_monitoring_go/internal/model/cloudwatchlogs.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ type CloudwatchLogEntry struct {
1818
}
1919

2020
type CloudwatchMetadata struct {
21-
ForwarderMetadata
21+
Metadata
2222
Logs CloudwatchLogsContext `json:"awslogs"`
2323
}
2424

aws/logs_monitoring_go/internal/model/forwarder.go

Lines changed: 0 additions & 34 deletions
This file was deleted.

aws/logs_monitoring_go/internal/model/s3.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ type S3LogEntry struct {
1515
}
1616

1717
type S3Metadata struct {
18-
ForwarderMetadata
18+
Metadata
1919
S3Context S3Context `json:"s3"`
2020
}
2121

aws/logs_monitoring_go/internal/parsing/s3.go

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@ import (
2121
)
2222

2323
type s3RecordContext struct {
24-
forwarderMetadata model.ForwarderMetadata
25-
tags model.Tags
26-
source string
27-
service string
28-
bucket string
29-
key string
30-
multilineRegex *regexp.Regexp
24+
metadata model.Metadata
25+
tags model.Tags
26+
source string
27+
service string
28+
bucket string
29+
key string
30+
multilineRegex *regexp.Regexp
3131
}
3232

3333
func HandleS3(ctx context.Context, event json.RawMessage, cfg *config.Config, out chan<- model.S3LogEntry) error {
@@ -41,13 +41,16 @@ func HandleS3(ctx context.Context, event json.RawMessage, cfg *config.Config, ou
4141
return fmt.Errorf("create S3 client: %w", err)
4242
}
4343

44-
forwarderMetadata := model.GetForwarderMetadata(ctx)
44+
forwarderMetadata, err := model.GetMetadata(ctx)
45+
if err != nil {
46+
return err
47+
}
4548

4649
for _, record := range s3Event.Records {
4750
bucket := record.S3.Bucket.Name
4851
key := record.S3.Object.URLDecodedKey
4952

50-
tags, service := getTagsAndService(*cfg)
53+
tags, service := getTagsAndService(cfg)
5154
source := getS3Source(cfg.Source, key)
5255
if service == "" {
5356
service = source
@@ -78,8 +81,9 @@ func processS3Record(ctx context.Context, client S3APIClient, out chan<- model.S
7881

7982
var buf strings.Builder
8083
scanner := bufio.NewScanner(body)
84+
scanner.Split(split)
8185
for scanner.Scan() {
82-
line := scanner.Text()
86+
line := strings.ToValidUTF8(scanner.Text(), "")
8387
if rc.multilineRegex != nil {
8488
if rc.multilineRegex.MatchString(line) {
8589
if buf.Len() > 0 {
@@ -124,7 +128,7 @@ func makeS3Entry(rc s3RecordContext, message string) model.S3LogEntry {
124128

125129
ddtags = append(ddtags, "service:"+entryService)
126130
metadata := model.S3Metadata{
127-
ForwarderMetadata: rc.forwarderMetadata,
131+
Metadata: rc.metadata,
128132
S3Context: model.S3Context{
129133
Bucket: rc.bucket,
130134
Key: rc.key,
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2026-Present Datadog, Inc.
5+
6+
package parsing
7+
8+
import "bytes"
9+
10+
func split(data []byte, atEOF bool) (advance int, token []byte, err error) {
11+
if atEOF && len(data) == 0 {
12+
return 0, nil, nil
13+
}
14+
15+
if i := bytes.IndexAny(data, "\n\r\f"); i >= 0 {
16+
j := i
17+
for j < len(data) && (data[j] == '\n' || data[j] == '\r' || data[j] == '\f') {
18+
j++
19+
}
20+
return j, data[:i], nil
21+
}
22+
23+
if atEOF {
24+
return len(data), data, nil
25+
}
26+
27+
return 0, nil, nil
28+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2026-Present Datadog, Inc.
5+
6+
package parsing
7+
8+
import (
9+
"bufio"
10+
"strings"
11+
"testing"
12+
13+
"github.com/google/go-cmp/cmp"
14+
)
15+
16+
func TestSplit(t *testing.T) {
17+
t.Parallel()
18+
19+
tests := map[string]struct {
20+
input string
21+
want []string
22+
}{
23+
"empty_string": {input: "", want: nil},
24+
"plain_string": {input: "hello", want: []string{"hello"}},
25+
"new_lines": {input: "a\nb\nc", want: []string{"a", "b", "c"}},
26+
"trailing_new_line": {input: "a\nb\n", want: []string{"a", "b"}},
27+
"crlf": {input: "a\r\nb\r\nc", want: []string{"a", "b", "c"}},
28+
"form_feed": {input: "a\fb\fc", want: []string{"a", "b", "c"}},
29+
"mixed_delimiters": {input: "a\r\n\fb", want: []string{"a", "b"}},
30+
"only_delimiters": {input: "\n\r\f", want: nil},
31+
}
32+
33+
for name, tc := range tests {
34+
t.Run(name, func(t *testing.T) {
35+
t.Parallel()
36+
37+
scanner := bufio.NewScanner(strings.NewReader(tc.input))
38+
scanner.Split(split)
39+
40+
var got []string
41+
for scanner.Scan() {
42+
if text := scanner.Text(); text != "" {
43+
got = append(got, text)
44+
}
45+
}
46+
47+
if err := scanner.Err(); err != nil {
48+
t.Fatalf("unexpected scanner error: %v", err)
49+
}
50+
51+
if diff := cmp.Diff(tc.want, got); diff != "" {
52+
t.Errorf("mismatch (-want +got):\n%s", diff)
53+
}
54+
})
55+
}
56+
}

0 commit comments

Comments
 (0)