diff --git a/aws/logs_monitoring_go/cmd/forwarder/main.go b/aws/logs_monitoring_go/cmd/forwarder/main.go index 5b8f8649b..76eee1c5c 100644 --- a/aws/logs_monitoring_go/cmd/forwarder/main.go +++ b/aws/logs_monitoring_go/cmd/forwarder/main.go @@ -11,6 +11,8 @@ import ( "log/slog" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/parsing" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/pipeline" "github.com/aws/aws-lambda-go/lambda" ) @@ -26,10 +28,15 @@ func main() { lambda.Start(handleRequest(cfg)) } -// cfg not used for now, will be when forwarding logic added func handleRequest(cfg *config.Config) func(context.Context, json.RawMessage) error { return func(ctx context.Context, event json.RawMessage) error { - slog.Info("received event", slog.String("event", string(event))) - return nil + invocationSource := parsing.DetectInvocationSource(event) + switch invocationSource { + case parsing.InvocationSourceCloudwatchLogs: + return pipeline.Run(ctx, event, cfg, parsing.HandleCloudwatchLogs) + default: + slog.Error("unsupported invocation source", slog.String("source", invocationSource.String())) + return nil + } } } diff --git a/aws/logs_monitoring_go/internal/forwarding/forwarding.go b/aws/logs_monitoring_go/internal/forwarding/forwarding.go index d49ce98d9..fc2b3b195 100644 --- a/aws/logs_monitoring_go/internal/forwarding/forwarding.go +++ b/aws/logs_monitoring_go/internal/forwarding/forwarding.go @@ -16,31 +16,48 @@ import ( "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent" "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" + "golang.org/x/sync/errgroup" ) +const numWorkers = 3 + type Forwarder struct { - Config config.Config - Client *http.Client + config *config.Config + client *http.Client +} + +func NewForwarder(config *config.Config, client *http.Client) Forwarder { + return Forwarder{ + config: config, + client: client, + } } func (f Forwarder) Forward(ctx context.Context, in <-chan []byte) error { - for { - body, ok, err := concurrent.SafeReader(ctx, in) - if err != nil { - return err - } - if !ok { - break - } + g, ctx := errgroup.WithContext(ctx) - if err := f.send(ctx, body); err != nil { - return err - } + for range numWorkers { + g.Go(func() error { + for { + body, ok, err := concurrent.SafeReader(ctx, in) + if err != nil { + return err + } + if !ok { + return nil + } + + if err := f.send(ctx, body); err != nil { + return err + } + } + }) } - return nil + return g.Wait() } +// TODO: add retry mechanism for resiliency func (f Forwarder) send(ctx context.Context, body []byte) error { var compressedBody bytes.Buffer zw := gzip.NewWriter(&compressedBody) @@ -51,18 +68,19 @@ func (f Forwarder) send(ctx context.Context, body []byte) error { return fmt.Errorf("closing gzip writer: %w", err) } - req, err := http.NewRequestWithContext(ctx, http.MethodPost, f.Config.IntakeURL, bytes.NewReader(compressedBody.Bytes())) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, f.config.IntakeURL, bytes.NewReader(compressedBody.Bytes())) if err != nil { return err } - req.Header.Set("DD-API-KEY", f.Config.APIKey) + req.Header.Set("DD-API-KEY", f.config.APIKey) req.Header.Set("Content-Type", "application/json") req.Header.Set("Content-Encoding", "gzip") req.Header.Set("DD-EVP-ORIGIN", "aws_forwarder") req.Header.Set("DD-EVP-ORIGIN-VERSION", config.ForwarderVersion) + req.Header.Set("DD-STORAGE-TAG", "cloudwatch") - resp, err := f.Client.Do(req) + resp, err := f.client.Do(req) if err != nil { return fmt.Errorf("sending to intake: %w", err) } diff --git a/aws/logs_monitoring_go/internal/forwarding/forwarding_test.go b/aws/logs_monitoring_go/internal/forwarding/forwarding_test.go index 864c21db4..20468371a 100644 --- a/aws/logs_monitoring_go/internal/forwarding/forwarding_test.go +++ b/aws/logs_monitoring_go/internal/forwarding/forwarding_test.go @@ -111,13 +111,10 @@ func TestForward(t *testing.T) { })) defer server.Close() - f := Forwarder{ - Config: config.Config{ - IntakeURL: server.URL, - APIKey: "test-api-key", - }, - Client: server.Client(), - } + f := NewForwarder(&config.Config{ + IntakeURL: server.URL, + APIKey: "test-api-key", + }, server.Client()) ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go b/aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go new file mode 100644 index 000000000..9e9383b91 --- /dev/null +++ b/aws/logs_monitoring_go/internal/parsing/cloudwatchlogs.go @@ -0,0 +1,148 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package parsing + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "strings" + + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" + "github.com/aws/aws-lambda-go/events" + "github.com/aws/aws-lambda-go/lambdacontext" +) + +func HandleCloudwatchLogs(ctx context.Context, event json.RawMessage, cfg *config.Config, out chan<- model.CloudwatchLogEntry) error { + logEntries, err := parseCloudwatchLogs(ctx, event, cfg) + if err != nil { + return err + } + + for _, logEntry := range logEntries { + if err := concurrent.SafeSender(ctx, out, logEntry); err != nil { + return err + } + } + + return nil +} + +func parseCloudwatchLogs(ctx context.Context, event json.RawMessage, cfg *config.Config) ([]model.CloudwatchLogEntry, error) { + var cwEvent events.CloudwatchLogsEvent + if err := json.Unmarshal(event, &cwEvent); err != nil { + return nil, fmt.Errorf("unmarshal: %w", err) + } + + data, err := cwEvent.AWSLogs.Parse() + if err != nil { + return nil, fmt.Errorf("parse: %w", err) + } + + if data.MessageType == "CONTROL_MESSAGE" { + return nil, nil + } + + source := getCloudwatchSource(cfg.Source, data.LogGroup, data.LogStream) + metadata := getCloudwatchMetadata(ctx, data) + host := getCloudwatchHost(cfg.Host, data.LogGroup) + tags, service := getTagsAndService(*cfg) + if service == "" { + service = source + } + + var entries []model.CloudwatchLogEntry + for _, le := range data.LogEvents { + ddtags, ddtagsService, message := extractFromMessage(le.Message) + entryService := service + if ddtagsService != "" { + entryService = ddtagsService + } + ddtags = append(ddtags, "service:"+entryService) + + entry := model.CloudwatchLogEntry{ + ID: le.ID, + Timestamp: le.Timestamp, + Message: message, + Source: source, + Service: entryService, + Host: host, + Tags: append(ddtags, tags...), + AWS: metadata, + } + entries = append(entries, entry) + } + + return entries, nil +} + +func getCloudwatchSource(sourceOverride, logGroup, logStream string) string { + if sourceOverride != "" { + return sourceOverride + } + + if strings.HasPrefix(logStream, "states/") { + return "stepfunction" + } + + if strings.Contains(logStream, "_CloudTrail_") { + return "cloudtrail" + } + + return getSourceFromLogGroup(strings.ToLower(logGroup)) +} + +func getSourceFromLogGroup(logGroupLower string) string { + if strings.HasPrefix(logGroupLower, "_cloudtrail_") { + return "cloudtrail" + } + if strings.HasPrefix(logGroupLower, "/aws/kinesis") { + return "kinesis" + } + if strings.HasPrefix(logGroupLower, "/aws/lambda") { + return "lambda" + } + if strings.HasPrefix(logGroupLower, "sns/") { + return "sns" + } + if strings.Contains(logGroupLower, "cloudtrail") { + return "cloudtrail" + } + return "cloudwatch" +} + +func getCloudwatchMetadata(ctx context.Context, data events.CloudwatchLogsData) model.CloudwatchMetadata { + metadata := model.CloudwatchMetadata{ + Logs: model.CloudwatchLogsContext{ + LogGroup: data.LogGroup, + LogStream: data.LogStream, + Owner: data.Owner, + }, + } + + if lambdacontext.FunctionVersion != "$LATEST" { + metadata.FunctionVersion = lambdacontext.FunctionVersion + } + + if lc, ok := lambdacontext.FromContext(ctx); ok { + metadata.InvokedFunctionARN = lc.InvokedFunctionArn + } else { + slog.Warn("failed to load lambda context, this should not happen in production. The code is either not running from AWS Lambda or context is broken.") + } + + return metadata +} + +func getCloudwatchHost(hostOverride, logGroup string) string { + if hostOverride != "" { + return hostOverride + } + + return logGroup +} diff --git a/aws/logs_monitoring_go/internal/parsing/invocationsource_string.go b/aws/logs_monitoring_go/internal/parsing/invocationsource_string.go new file mode 100644 index 000000000..7d7b15b95 --- /dev/null +++ b/aws/logs_monitoring_go/internal/parsing/invocationsource_string.go @@ -0,0 +1,29 @@ +// Code generated by "stringer -type InvocationSource -trimprefix InvocationSource"; DO NOT EDIT. + +package parsing + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[InvocationSourceUnknown-0] + _ = x[InvocationSourceCloudwatchLogs-1] + _ = x[InvocationSourceS3-2] + _ = x[InvocationSourceSNS-3] + _ = x[InvocationSourceSQS-4] + _ = x[InvocationSourceKinesis-5] +} + +const _InvocationSource_name = "UnknownCloudwatchLogsS3SNSSQSKinesis" + +var _InvocationSource_index = [...]uint8{0, 7, 21, 23, 26, 29, 36} + +func (i InvocationSource) String() string { + idx := int(i) - 0 + if i < 0 || idx >= len(_InvocationSource_index)-1 { + return "InvocationSource(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _InvocationSource_name[_InvocationSource_index[idx]:_InvocationSource_index[idx+1]] +} diff --git a/aws/logs_monitoring_go/internal/parsing/parsing.go b/aws/logs_monitoring_go/internal/parsing/parsing.go index fe8b41461..f80440735 100644 --- a/aws/logs_monitoring_go/internal/parsing/parsing.go +++ b/aws/logs_monitoring_go/internal/parsing/parsing.go @@ -7,100 +7,83 @@ package parsing import ( "bytes" - "context" "encoding/json" - "log/slog" - - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" - "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" ) -type invocationSource int +//go:generate stringer -type InvocationSource -trimprefix InvocationSource +type InvocationSource int const ( - invocationSourceUnknown invocationSource = iota - invocationSourceCloudwatchLogs - invocationSourceS3 - invocationSourceSNS - invocationSourceSQS - invocationSourceKinesis + InvocationSourceUnknown InvocationSource = iota + InvocationSourceCloudwatchLogs + InvocationSourceS3 + InvocationSourceSNS + InvocationSourceSQS + InvocationSourceKinesis ) -func Parse(ctx context.Context, event json.RawMessage, cfg *config.Config, out chan<- model.CloudwatchLogEntry) { - switch detectInvocationSource(event) { - case invocationSourceCloudwatchLogs: - parseCloudwatchLogs(ctx, event, cfg, out) - default: - slog.Error("unsupported invocation source", slog.String("event", string(event))) - } -} - -func detectInvocationSource(event json.RawMessage) invocationSource { +func DetectInvocationSource(event json.RawMessage) InvocationSource { dec := json.NewDecoder(bytes.NewReader(event)) t, err := dec.Token() if err != nil || t != json.Delim('{') { - return invocationSourceUnknown + return InvocationSourceUnknown } key, err := dec.Token() if err != nil { - return invocationSourceUnknown + return InvocationSourceUnknown } if key == "awslogs" { - return invocationSourceCloudwatchLogs + return InvocationSourceCloudwatchLogs } if key == "Records" { return detectFromRecords(dec) } - return invocationSourceUnknown + return InvocationSourceUnknown } -func detectFromRecords(dec *json.Decoder) invocationSource { +func detectFromRecords(dec *json.Decoder) InvocationSource { t, err := dec.Token() if err != nil || t != json.Delim('[') { - return invocationSourceUnknown + return InvocationSourceUnknown } t, err = dec.Token() if err != nil || t != json.Delim('{') { - return invocationSourceUnknown + return InvocationSourceUnknown } for dec.More() { key, err := dec.Token() if err != nil { - return invocationSourceUnknown + return InvocationSourceUnknown } if key == "eventSource" { val, err := dec.Token() if err != nil { - return invocationSourceUnknown + return InvocationSourceUnknown } switch val { case "aws:s3": - return invocationSourceS3 + return InvocationSourceS3 case "aws:sns": - return invocationSourceSNS + return InvocationSourceSNS case "aws:sqs": - return invocationSourceSQS + return InvocationSourceSQS case "aws:kinesis": - return invocationSourceKinesis + return InvocationSourceKinesis default: - return invocationSourceUnknown + return InvocationSourceUnknown } } var skip json.RawMessage if err := dec.Decode(&skip); err != nil { - return invocationSourceUnknown + return InvocationSourceUnknown } } - return invocationSourceUnknown -} - -func parseCloudwatchLogs(ctx context.Context, event json.RawMessage, cfg *config.Config, out chan<- model.CloudwatchLogEntry) { - + return InvocationSourceUnknown } diff --git a/aws/logs_monitoring_go/internal/parsing/parsing_bench_test.go b/aws/logs_monitoring_go/internal/parsing/parsing_bench_test.go index ff119f61b..ecd5c3765 100644 --- a/aws/logs_monitoring_go/internal/parsing/parsing_bench_test.go +++ b/aws/logs_monitoring_go/internal/parsing/parsing_bench_test.go @@ -34,7 +34,7 @@ func BenchmarkDetectInvocationSource(b *testing.B) { for _, tc := range benchCases { b.Run(tc.name, func(b *testing.B) { for b.Loop() { - detectInvocationSource(tc.event) + DetectInvocationSource(tc.event) } }) } diff --git a/aws/logs_monitoring_go/internal/parsing/parsing_test.go b/aws/logs_monitoring_go/internal/parsing/parsing_test.go index 8072f532d..687edab57 100644 --- a/aws/logs_monitoring_go/internal/parsing/parsing_test.go +++ b/aws/logs_monitoring_go/internal/parsing/parsing_test.go @@ -13,33 +13,33 @@ import ( func TestDetectInvocationSource(t *testing.T) { tests := map[string]struct { event json.RawMessage - wantKey invocationSource + wantKey InvocationSource }{ "cloudwatch logs": { event: json.RawMessage(`{"awslogs":{"data":"dGVzdA=="}}`), - wantKey: invocationSourceCloudwatchLogs, + wantKey: InvocationSourceCloudwatchLogs, }, "empty object": { event: json.RawMessage(`{}`), - wantKey: invocationSourceUnknown, + wantKey: InvocationSourceUnknown, }, "s3 not yet implemented": { event: json.RawMessage(`{"Records":[{"s3":{"bucket":{"name":"my-bucket"},"object":{"key":"my-key"}}}]}`), - wantKey: invocationSourceUnknown, + wantKey: InvocationSourceUnknown, }, "not JSON": { event: json.RawMessage(`not a json`), - wantKey: invocationSourceUnknown, + wantKey: InvocationSourceUnknown, }, "empty input": { event: json.RawMessage(``), - wantKey: invocationSourceUnknown, + wantKey: InvocationSourceUnknown, }, } for name, tc := range tests { t.Run(name, func(t *testing.T) { - got := detectInvocationSource(tc.event) + got := DetectInvocationSource(tc.event) if got != tc.wantKey { t.Errorf("got %d, want %d", got, tc.wantKey) } diff --git a/aws/logs_monitoring_go/internal/parsing/tags.go b/aws/logs_monitoring_go/internal/parsing/tags.go new file mode 100644 index 000000000..6c3b94f77 --- /dev/null +++ b/aws/logs_monitoring_go/internal/parsing/tags.go @@ -0,0 +1,88 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package parsing + +import ( + "encoding/json" + "strings" + + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" + "github.com/aws/aws-lambda-go/lambdacontext" +) + +const DdtagsKey = "ddtags" + +func getTagsAndService(cfg config.Config) (model.Tags, string) { + var tags model.Tags + var service string + + if cfg.CustomTags != "" { + for tag := range strings.SplitSeq(cfg.CustomTags, ",") { + if strings.HasPrefix(tag, "service:") { + if service == "" { + service = tag[8:] + } + continue + } + + tags = append(tags, tag) + } + } + + if cfg.Source != "" { + tags = append(tags, "source_overridden:true") + } + + tags = append(tags, + "forwardername:"+strings.ToLower(lambdacontext.FunctionName), + "forwarder_version:"+config.ForwarderVersion, + ) + + return tags, service +} + +func extractFromMessage(message string) (model.Tags, string, string) { + var tags model.Tags + var service string + + var jsonMessage map[string]any + if err := json.Unmarshal([]byte(message), &jsonMessage); err != nil { + return nil, service, message + } + + ddtagsRaw, ok := jsonMessage[DdtagsKey] + if !ok { + return nil, service, message + } + + ddtagsStr, ok := ddtagsRaw.(string) + if !ok { + return nil, service, message + } + + ddtagsStr = strings.ReplaceAll(ddtagsStr, " ", "") + + for tag := range strings.SplitSeq(ddtagsStr, ",") { + if strings.HasPrefix(tag, "service:") { + if service == "" { + service = tag[8:] + } + continue + } + + tags = append(tags, tag) + } + + delete(jsonMessage, DdtagsKey) + + newMessage, err := json.Marshal(jsonMessage) + if err != nil { + return nil, service, message + } + + return tags, service, string(newMessage) +} diff --git a/aws/logs_monitoring_go/internal/parsing/tags_test.go b/aws/logs_monitoring_go/internal/parsing/tags_test.go new file mode 100644 index 000000000..dfa873892 --- /dev/null +++ b/aws/logs_monitoring_go/internal/parsing/tags_test.go @@ -0,0 +1,164 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package parsing + +import ( + "encoding/json" + "slices" + "strings" + "testing" + + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" +) + +func TestExtractFromMessage(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + message string + wantTags model.Tags + wantService string + wantMessage string + }{ + "empty string": { + message: "", + wantTags: nil, + wantService: "", + wantMessage: "", + }, + "plain text": { + message: "ERROR something went wrong", + wantTags: nil, + wantService: "", + wantMessage: "ERROR something went wrong", + }, + "invalid json": { + message: `{not valid json}`, + wantTags: nil, + wantService: "", + wantMessage: `{not valid json}`, + }, + "json without ddtags": { + message: `{"level":"INFO","msg":"hello"}`, + wantTags: nil, + wantService: "", + wantMessage: `{"level":"INFO","msg":"hello"}`, + }, + "ddtags is not a string": { + message: `{"ddtags":["tag1","tag2"]}`, + wantTags: nil, + wantService: "", + wantMessage: `{"ddtags":["tag1","tag2"]}`, + }, + "single tag": { + message: `{"msg":"hello","ddtags":"env:prod"}`, + wantTags: model.Tags{"env:prod"}, + wantService: "", + wantMessage: `{"msg":"hello"}`, + }, + "multiple tags": { + message: `{"msg":"hello","ddtags":"env:prod,team:backend"}`, + wantTags: model.Tags{"env:prod", "team:backend"}, + wantService: "", + wantMessage: `{"msg":"hello"}`, + }, + "tags with spaces are cleaned": { + message: `{"msg":"hello","ddtags":"env:prod, team:backend, version:1.0"}`, + wantTags: model.Tags{"env:prod", "team:backend", "version:1.0"}, + wantService: "", + wantMessage: `{"msg":"hello"}`, + }, + "service tag extracted": { + message: `{"msg":"hello","ddtags":"service:my-app,env:prod"}`, + wantTags: model.Tags{"env:prod"}, + wantService: "my-app", + wantMessage: `{"msg":"hello"}`, + }, + "service only": { + message: `{"msg":"hello","ddtags":"service:my-app"}`, + wantTags: nil, + wantService: "my-app", + wantMessage: `{"msg":"hello"}`, + }, + "first service wins": { + message: `{"msg":"hello","ddtags":"service:first,service:second,env:prod"}`, + wantTags: model.Tags{"env:prod"}, + wantService: "first", + wantMessage: `{"msg":"hello"}`, + }, + "ddtags is empty string": { + message: `{"msg":"hello","ddtags":""}`, + wantTags: model.Tags{""}, + wantService: "", + wantMessage: `{"msg":"hello"}`, + }, + "ddtags only field in json": { + message: `{"ddtags":"env:prod"}`, + wantTags: model.Tags{"env:prod"}, + wantService: "", + wantMessage: `{}`, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + t.Parallel() + + gotTags, gotService, gotMessage := extractFromMessage(tc.message) + + if !slices.Equal(gotTags, tc.wantTags) { + t.Errorf("tags: got %v, want %v", gotTags, tc.wantTags) + } + if gotService != tc.wantService { + t.Errorf("service: got %q, want %q", gotService, tc.wantService) + } + if gotMessage != tc.wantMessage { + t.Errorf("message: got %q, want %q", gotMessage, tc.wantMessage) + } + }) + } +} + +func FuzzExtractFromMessage(f *testing.F) { + seeds := []string{ + "", + "plain text, not json", + `{not valid json}`, + `{"msg":"hello"}`, + `{"ddtags":["tag1","tag2"]}`, + `{"ddtags":42}`, + `{"msg":"hello","ddtags":"env:prod"}`, + `{"msg":"hello","ddtags":"env:prod, team:backend, version:1.0"}`, + `{"msg":"hello","ddtags":"service:my-app,env:prod"}`, + `{"msg":"hello","ddtags":"service:my-app"}`, + `{"msg":"hello","ddtags":"service:first,service:second,env:prod"}`, + `{"msg":"hello","ddtags":""}`, + `{"ddtags":"env:prod"}`, + } + for _, seed := range seeds { + f.Add(seed) + } + + f.Fuzz(func(t *testing.T, message string) { + tags, _, outMessage := extractFromMessage(message) + + if outMessage != message { + var parsed map[string]any + if err := json.Unmarshal([]byte(outMessage), &parsed); err != nil { + t.Errorf("output message is not valid JSON: %v", err) + } + if _, ok := parsed[DdtagsKey]; ok { + t.Errorf("output message still contains %q key", DdtagsKey) + } + } + + for _, tag := range tags { + if strings.HasPrefix(tag, "service:") { + t.Errorf("tag %q should have been extracted as service, not returned in tags", tag) + } + } + }) +} diff --git a/aws/logs_monitoring_go/internal/pipeline/pipeline.go b/aws/logs_monitoring_go/internal/pipeline/pipeline.go new file mode 100644 index 000000000..9817d562e --- /dev/null +++ b/aws/logs_monitoring_go/internal/pipeline/pipeline.go @@ -0,0 +1,53 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-Present Datadog, Inc. + +package pipeline + +import ( + "context" + "encoding/json" + "net/http" + "time" + + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/forwarding" + "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/processing" + "golang.org/x/sync/errgroup" +) + +const timeout = 10 * time.Second + +func Run[T any]( + ctx context.Context, + event json.RawMessage, + cfg *config.Config, + handler func(context.Context, json.RawMessage, *config.Config, chan<- T) error, +) error { + g, ctx := errgroup.WithContext(ctx) // Fragile because if one goroutine fails, all stages will stop gracefully + + entries := make(chan T) + batches := make(chan []byte) + + batcher := processing.NewBatcher[T]() + forwarder := forwarding.NewForwarder(cfg, &http.Client{ + Timeout: timeout, + }) + + g.Go(func() error { + defer close(entries) + return handler(ctx, event, cfg, entries) + }) + + g.Go(func() error { + defer close(batches) + return batcher.Batch(ctx, entries, batches) + }) + + g.Go(func() error { + return forwarder.Forward(ctx, batches) + }) + + return g.Wait() +}