-
Notifications
You must be signed in to change notification settings - Fork 390
[AWSINTS-3515] feat(go-forwarder): add SNS S3 wrapper + refactor S3 #1111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
10a27da
e7091be
58db532
27be00e
a1df57c
3d7900a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| // Unless explicitly stated otherwise all files in this repository are licensed | ||
| // under the Apache License Version 2.0. | ||
| // This product includes software developed at Datadog (https://www.datadoghq.com/). | ||
| // Copyright 2026-Present Datadog, Inc. | ||
|
|
||
| package parsing | ||
|
|
||
| import ( | ||
| "bytes" | ||
| "compress/gzip" | ||
| "context" | ||
| "encoding/json" | ||
| "testing" | ||
|
|
||
| "github.com/aws/aws-lambda-go/lambdacontext" | ||
| ) | ||
|
|
||
| const testARN = "arn:aws:lambda:us-east-1:123456789012:function:forwarder" | ||
|
|
||
| var testLambdaCtx = lambdacontext.NewContext(context.Background(), &lambdacontext.LambdaContext{ | ||
| InvokedFunctionArn: testARN, | ||
| }) | ||
|
|
||
| func mustGzipJSON(t *testing.T, v any) []byte { | ||
| t.Helper() | ||
|
|
||
| raw, err := json.Marshal(v) | ||
| if err != nil { | ||
| t.Fatalf("marshal: %v", err) | ||
| } | ||
|
|
||
| var buf bytes.Buffer | ||
| w := gzip.NewWriter(&buf) | ||
| if _, err := w.Write(raw); err != nil { | ||
| t.Fatalf("gzip write: %v", err) | ||
| } | ||
|
|
||
| if err := w.Close(); err != nil { | ||
| t.Fatalf("gzip close: %v", err) | ||
| } | ||
|
|
||
| return buf.Bytes() | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,11 +6,13 @@ | |
| package parsing | ||
|
|
||
| import ( | ||
| "cmp" | ||
| "context" | ||
| "encoding/json" | ||
| "fmt" | ||
| "log/slog" | ||
| "regexp" | ||
| "slices" | ||
| "strings" | ||
|
|
||
| "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent" | ||
|
|
@@ -19,7 +21,7 @@ import ( | |
| "github.com/aws/aws-lambda-go/events" | ||
| ) | ||
|
|
||
| type s3RecordContext struct { | ||
| type s3Record struct { | ||
| metadata model.Metadata | ||
| tags model.Tags | ||
| source string | ||
|
|
@@ -30,19 +32,31 @@ type s3RecordContext struct { | |
| } | ||
|
|
||
| func HandleS3(ctx context.Context, event json.RawMessage, cfg *config.Config, out chan<- model.S3LogEntry) error { | ||
| var s3Event events.S3Event | ||
| if err := json.Unmarshal(event, &s3Event); err != nil { | ||
| return fmt.Errorf("unmarshal: %w", err) | ||
| client, metadata, err := setupS3(ctx, cfg) | ||
| if err != nil { | ||
| return fmt.Errorf("setup s3: %w", err) | ||
| } | ||
| return handleS3Event(ctx, event, cfg, client, metadata, out) | ||
| } | ||
|
|
||
| func setupS3(ctx context.Context, cfg *config.Config) (S3APIClient, model.Metadata, error) { | ||
| client, err := createS3APIClient(ctx, cfg.UseFIPS) | ||
| if err != nil { | ||
| return fmt.Errorf("create S3 client: %w", err) | ||
| return nil, model.Metadata{}, fmt.Errorf("create S3 client: %w", err) | ||
| } | ||
|
|
||
| forwarderMetadata, err := model.GetMetadata(ctx) | ||
| metadata, err := model.GetMetadata(ctx) | ||
| if err != nil { | ||
| return err | ||
| return nil, model.Metadata{}, fmt.Errorf("get metadata: %w", err) | ||
| } | ||
|
|
||
| return client, metadata, nil | ||
| } | ||
|
|
||
| func handleS3Event(ctx context.Context, event json.RawMessage, cfg *config.Config, client S3APIClient, metadata model.Metadata, out chan<- model.S3LogEntry) error { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| var s3Event events.S3Event | ||
| if err := json.Unmarshal(event, &s3Event); err != nil { | ||
| return fmt.Errorf("unmarshal: %w", err) | ||
| } | ||
|
|
||
| for _, record := range s3Event.Records { | ||
|
|
@@ -51,30 +65,36 @@ func HandleS3(ctx context.Context, event json.RawMessage, cfg *config.Config, ou | |
|
|
||
| tags, service := getTagsAndService(cfg) | ||
| source := getS3Source(cfg.Source, key) | ||
| if service == "" { | ||
| service = source | ||
| } | ||
|
|
||
| rc := s3RecordContext{ | ||
| forwarderMetadata, tags, source, service, bucket, key, cfg.S3MultilineLogRegex, | ||
| service = cmp.Or(service, source) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. More elegant solution. Will update the Cloudwatch flow also in a next PR (avoids cluttering this one). |
||
|
|
||
| rc := s3Record{ | ||
| metadata: metadata, | ||
| tags: tags, | ||
| source: source, | ||
| service: service, | ||
| bucket: bucket, | ||
| key: key, | ||
| multilineRegex: cfg.S3MultilineLogRegex, | ||
| } | ||
| if err := processS3Record(ctx, client, out, rc); err != nil { | ||
| return fmt.Errorf("process S3 record: %w", err) | ||
| slog.WarnContext(ctx, "skipping s3 record", | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Standardizing the behavior between handlers. |
||
| "bucket", bucket, "key", key, "error", err) | ||
| continue | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func processS3Record(ctx context.Context, client S3APIClient, out chan<- model.S3LogEntry, rc s3RecordContext) error { | ||
| func processS3Record(ctx context.Context, client S3APIClient, out chan<- model.S3LogEntry, rc s3Record) error { | ||
| body, err := getS3Object(ctx, client, rc.bucket, rc.key) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| defer func() { | ||
| if err := body.Close(); err != nil { | ||
| slog.Warn("failed to close response body", slog.Any("error", err)) | ||
| slog.WarnContext(ctx, "failed to close response body", slog.Any("error", err)) | ||
| } | ||
| }() | ||
|
|
||
|
|
@@ -87,21 +107,18 @@ func processS3Record(ctx context.Context, client S3APIClient, out chan<- model.S | |
| } | ||
|
|
||
| if err := scanner.Err(); err != nil { | ||
| return fmt.Errorf("scan s3://%s/%s: %w", rc.bucket, rc.key, err) | ||
| return fmt.Errorf("scan: %w", err) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func makeS3Entry(rc s3RecordContext, message string) model.S3LogEntry { | ||
| func makeS3Entry(rc s3Record, message string) model.S3LogEntry { | ||
| ddtags, ddtagsService, message := extractFromMessage(message) | ||
|
|
||
| entryService := rc.service | ||
| if ddtagsService != "" { | ||
| entryService = ddtagsService | ||
| } | ||
| entryService := cmp.Or(ddtagsService, rc.service) | ||
|
|
||
| ddtags = append(ddtags, "service:"+entryService) | ||
| tags := slices.Concat(ddtags, model.Tags{"service:" + entryService}, rc.tags) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. More elegant and performant solution since it is capped at most 1 allocation. |
||
| metadata := model.S3Metadata{ | ||
| Metadata: rc.metadata, | ||
| S3Context: model.S3Context{ | ||
|
|
@@ -115,7 +132,7 @@ func makeS3Entry(rc s3RecordContext, message string) model.S3LogEntry { | |
| Source: rc.source, | ||
| SourceCategory: sourceCategory, | ||
| Service: entryService, | ||
| Tags: append(ddtags, rc.tags...), | ||
| Tags: tags, | ||
| Metadata: metadata, | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small refactor to not confuse this the
contextpackage.