Skip to content

Commit 3d4d8e6

Browse files
committed
fix: flush data when exceeds buffer
1 parent 4bd7543 commit 3d4d8e6

2 files changed

Lines changed: 48 additions & 4 deletions

File tree

aws/logs_monitoring_go/internal/parsing/scanner.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212
"regexp"
1313
)
1414

15+
const maxTokenSize = 512 * 1024
16+
1517
type Scanner struct {
1618
*bufio.Scanner
1719
rg *regexp.Regexp
@@ -23,6 +25,8 @@ func NewScanner(r io.Reader, rg *regexp.Regexp) *Scanner {
2325
rg: rg,
2426
}
2527

28+
s.Buffer(make([]byte, 0, bufio.MaxScanTokenSize), maxTokenSize)
29+
2630
if rg != nil {
2731
s.Split(s.splitOnRegex)
2832
} else {
@@ -33,7 +37,7 @@ func NewScanner(r io.Reader, rg *regexp.Regexp) *Scanner {
3337
}
3438

3539
func (s *Scanner) splitOnRegex(data []byte, atEOF bool) (advance int, token []byte, err error) {
36-
if atEOF && len(data) == 0 {
40+
if len(data) == 0 {
3741
return 0, nil, nil
3842
}
3943

@@ -43,15 +47,15 @@ func (s *Scanner) splitOnRegex(data []byte, atEOF bool) (advance int, token []by
4347
return splitAt, data[:splitAt], nil
4448
}
4549

46-
if atEOF {
50+
if atEOF || len(data) >= maxTokenSize {
4751
return len(data), data, nil
4852
}
4953

5054
return 0, nil, nil
5155
}
5256

5357
func (s *Scanner) splitOnLines(data []byte, atEOF bool) (advance int, token []byte, err error) {
54-
if atEOF && len(data) == 0 {
58+
if len(data) == 0 {
5559
return 0, nil, nil
5660
}
5761

@@ -63,7 +67,7 @@ func (s *Scanner) splitOnLines(data []byte, atEOF bool) (advance int, token []by
6367
return j, data[:i], nil
6468
}
6569

66-
if atEOF {
70+
if atEOF || len(data) >= maxTokenSize {
6771
return len(data), data, nil
6872
}
6973

aws/logs_monitoring_go/internal/parsing/scanner_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ func TestScanner(t *testing.T) {
3333
"lines_mixed_delimiters": {input: "a\r\n\fb", rg: nil, want: []string{"a", "b"}},
3434
"lines_only_delimiters": {input: "\n\r\f", rg: nil, want: nil},
3535

36+
"regex_empty": {input: "", rg: dateRegex, want: nil},
37+
"regex_not_matching_at_start": {input: "ERROR something2024-01-15 ERROR something", rg: dateRegex, want: []string{"ERROR something", "2024-01-15 ERROR something"}},
3638
"regex_single_entry": {input: "2024-01-15 ERROR something", rg: dateRegex, want: []string{"2024-01-15 ERROR something"}},
3739
"regex_two_entries_with_newline": {input: "2024-01-15 ERROR\n2024-01-16 INFO", rg: dateRegex, want: []string{"2024-01-15 ERROR\n", "2024-01-16 INFO"}},
3840
"regex_continuation_lines": {input: "2024-01-15 ERROR\n at com.foo\n2024-01-16 INFO", rg: dateRegex, want: []string{"2024-01-15 ERROR\n at com.foo\n", "2024-01-16 INFO"}},
@@ -65,3 +67,41 @@ func TestScanner(t *testing.T) {
6567
})
6668
}
6769
}
70+
71+
func TestScannerWithOversizedToken(t *testing.T) {
72+
t.Parallel()
73+
74+
overflow := 100
75+
oversized := strings.Repeat("a", maxTokenSize+overflow)
76+
77+
tests := map[string]struct {
78+
rg *regexp.Regexp
79+
}{
80+
"lines": {rg: nil},
81+
"regex": {rg: regexp.MustCompile(`\d{4}-\d{2}-\d{2}`)},
82+
}
83+
84+
for name, tc := range tests {
85+
t.Run(name, func(t *testing.T) {
86+
t.Parallel()
87+
scanner := NewScanner(strings.NewReader(oversized), tc.rg)
88+
89+
var got []string
90+
for scanner.Scan() {
91+
got = append(got, scanner.Text())
92+
}
93+
if err := scanner.Err(); err != nil {
94+
t.Fatalf("unexpected error: %v", err)
95+
}
96+
if len(got) != 2 {
97+
t.Fatalf("want 2 tokens, got %d", len(got))
98+
}
99+
if len(got[0]) != maxTokenSize {
100+
t.Errorf("first token: want %d bytes, got %d", maxTokenSize, len(got[0]))
101+
}
102+
if len(got[1]) != overflow {
103+
t.Errorf("second token: want %d bytes, got %d", overflow, len(got[1]))
104+
}
105+
})
106+
}
107+
}

0 commit comments

Comments
 (0)