[AWSINTS-3515] feat(go-forwarder): add SNS S3 wrapper + refactor S3#1111
[AWSINTS-3515] feat(go-forwarder): add SNS S3 wrapper + refactor S3#1111ndakkoune wants to merge 6 commits into
Conversation
| ) | ||
|
|
||
| type s3RecordContext struct { | ||
| type s3Record struct { |
There was a problem hiding this comment.
Small refactor to not confuse this the context package.
| return client, metadata, nil | ||
| } | ||
|
|
||
| func handleS3Event(ctx context.Context, event json.RawMessage, cfg *config.Config, client S3APIClient, metadata model.Metadata, out chan<- model.S3LogEntry) error { |
There was a problem hiding this comment.
handleS3Event will be called for each events.SNSEvent.Records entry and we don't want to create multiple S3 clients : we initialise it outside
|
|
||
| rc := s3RecordContext{ | ||
| forwarderMetadata, tags, source, service, bucket, key, cfg.S3MultilineLogRegex, | ||
| service = cmp.Or(service, source) |
There was a problem hiding this comment.
More elegant solution. Will update the Cloudwatch flow also in a next PR (avoids cluttering this one).
| } | ||
| if err := processS3Record(ctx, client, out, rc); err != nil { | ||
| return fmt.Errorf("process S3 record: %w", err) | ||
| slog.WarnContext(ctx, "skipping s3 record", |
There was a problem hiding this comment.
Standardizing the behavior between handlers.
| } | ||
|
|
||
| ddtags = append(ddtags, "service:"+entryService) | ||
| tags := slices.Concat(ddtags, model.Tags{"service:" + entryService}, rc.tags) |
There was a problem hiding this comment.
More elegant and performant solution since it is capped at most 1 allocation.
| } | ||
|
|
||
| for _, record := range snsEvent.Records { | ||
| s3Record := json.RawMessage(record.SNS.Message) |
There was a problem hiding this comment.
Could have had move this to a handleSNSRecord function but it would have take too many arguments to make it clean.
What does this PR do?
Adds the S3 message wrapping into SNS
Motivation
Testing Guidelines
Additional Notes
Types of changes
Check all that apply