Skip to content

Commit 2e33366

Browse files
committed
Switch from iter.Seq to iter.Seq2 without error in s3Record struct
1 parent 3270f73 commit 2e33366

4 files changed

Lines changed: 21 additions & 22 deletions

File tree

aws/logs_monitoring_go/internal/handling/cloudtrail.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,45 +30,45 @@ var ec2InstanceRegexp = sync.OnceValue(func() *regexp.Regexp {
3030
return regexp.MustCompile(`^arn:aws:sts::.*?:assumed-role/(?P<role>.*?)/(?P<host>i-([0-9a-f]{8}|[0-9a-f]{17}))$`)
3131
})
3232

33-
func decodeCloudTrail(r io.Reader) iter.Seq[s3Record] {
34-
return func(yield func(s3Record) bool) {
33+
func decodeCloudTrail(r io.Reader) iter.Seq2[s3Record, error] {
34+
return func(yield func(s3Record, error) bool) {
3535
gz, err := gzip.NewReader(r)
3636
if err != nil {
37-
yield(s3Record{Err: fmt.Errorf("decode cloudtrail gzip: %w", err)})
37+
yield(s3Record{}, fmt.Errorf("decode cloudtrail gzip: %w", err))
3838
return
3939
}
4040
defer gz.Close() //nolint:errcheck
4141

4242
dec := json.NewDecoder(gz)
4343

4444
if t, err := dec.Token(); err != nil || t != json.Delim('{') {
45-
yield(s3Record{Err: errors.New("decode cloudtrail: expected '{' at start of JSON")})
45+
yield(s3Record{}, errors.New("decode cloudtrail: expected '{' at start of JSON"))
4646
return
4747
}
4848
if t, err := dec.Token(); err != nil || t != "Records" {
49-
yield(s3Record{Err: errors.New("decode cloudtrail: expected 'Records' key")})
49+
yield(s3Record{}, errors.New("decode cloudtrail: expected 'Records' key"))
5050
return
5151
}
5252
if t, err := dec.Token(); err != nil || t != json.Delim('[') {
53-
yield(s3Record{Err: errors.New("decode cloudtrail: expected '[' at start of Records array")})
53+
yield(s3Record{}, errors.New("decode cloudtrail: expected '[' at start of Records array"))
5454
return
5555
}
5656

5757
for dec.More() {
5858
var record map[string]any
5959
if err := dec.Decode(&record); err != nil {
60-
yield(s3Record{Err: fmt.Errorf("decode cloudtrail record: %w", err)})
60+
yield(s3Record{}, fmt.Errorf("decode cloudtrail record: %w", err))
6161
return
6262
}
6363

6464
msg, err := json.Marshal(record)
6565
if err != nil {
66-
yield(s3Record{Err: fmt.Errorf("marshal cloudtrail record: %w", err)})
66+
yield(s3Record{}, fmt.Errorf("marshal cloudtrail record: %w", err))
6767
return
6868
}
6969

7070
host := cloudtrailHost(record)
71-
if !yield(s3Record{Message: string(msg), Host: host}) {
71+
if !yield(s3Record{Message: string(msg), Host: host}, nil) {
7272
return
7373
}
7474
}
@@ -98,7 +98,7 @@ func cloudtrailHost(record map[string]any) string {
9898
re := ec2InstanceRegexp()
9999
matches := re.FindStringSubmatch(arn)
100100
if matches == nil {
101-
slog.Debug(arn + " arn did not match, cloudtrail host extraction skipped")
101+
slog.Debug(arn + " arn did not match an EC2 host instance, cloudtrail host extraction skipped")
102102
return ""
103103
}
104104

aws/logs_monitoring_go/internal/handling/cloudtrail_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -198,10 +198,10 @@ func TestDecodeCloudTrail(t *testing.T) {
198198
t.Parallel()
199199

200200
var got []s3Record
201-
for rec := range decodeCloudTrail(bytes.NewReader(tc.input)) {
202-
if rec.Err != nil {
201+
for rec, err := range decodeCloudTrail(bytes.NewReader(tc.input)) {
202+
if err != nil {
203203
if !tc.wantErr {
204-
t.Fatalf("unexpected error: %v", rec.Err)
204+
t.Fatalf("unexpected error: %v", err)
205205
}
206206
return
207207
}

aws/logs_monitoring_go/internal/handling/s3.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ const (
3131
type s3Record struct {
3232
Message string
3333
Host string
34-
Err error
3534
}
3635

3736
type S3Handler struct {
@@ -82,16 +81,16 @@ func (h S3Handler) processRecord(ctx context.Context, client S3APIClient, out ch
8281
}
8382
}()
8483

85-
var records iter.Seq[s3Record]
84+
var records iter.Seq2[s3Record, error]
8685
if cloudTrailRegex().MatchString(key) {
8786
records = decodeCloudTrail(body)
8887
} else {
8988
records = scan(body, h.cfg.S3MultilineLogRegex)
9089
}
9190

92-
for rec := range records {
93-
if rec.Err != nil {
94-
return rec.Err
91+
for rec, err := range records {
92+
if err != nil {
93+
return err
9594
}
9695
entry := h.newS3LogEntry(eventRecord, rec.Message, lambdaOrigin)
9796
if h.cfg.Filter.ShouldExclude(entry.Message) {

aws/logs_monitoring_go/internal/handling/scanner.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,17 +80,17 @@ func (s *Scanner) splitOnLines(data []byte, atEOF bool) (advance int, token []by
8080
return 0, nil, nil
8181
}
8282

83-
func scan(r io.Reader, re *regexp.Regexp) iter.Seq[s3Record] {
84-
return func(yield func(s3Record) bool) {
83+
func scan(r io.Reader, re *regexp.Regexp) iter.Seq2[s3Record, error] {
84+
return func(yield func(s3Record, error) bool) {
8585
scanner := NewScanner(r, re)
8686
for scanner.Scan() {
8787
message := strings.ToValidUTF8(scanner.Text(), "")
88-
if !yield(s3Record{Message: message}) {
88+
if !yield(s3Record{Message: message}, nil) {
8989
return
9090
}
9191
}
9292
if err := scanner.Err(); err != nil {
93-
yield(s3Record{Err: err})
93+
yield(s3Record{}, err)
9494
}
9595
}
9696
}

0 commit comments

Comments
 (0)