Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions aws/logs_monitoring_go/internal/client/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/s3"
)

// See https://github.com/aws/aws-sdk-go-v2/issues/3416 for future configuration
const timeout = 10 * time.Second

var (
Expand Down
39 changes: 39 additions & 0 deletions aws/logs_monitoring_go/internal/forwarding/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2026-Present Datadog, Inc.

package forwarding

import (
"net"
"net/http"
"time"
)

const (
dialContextTimeout = 1 * time.Second
dialContextKeepAlive = 60 * time.Second
tlsHandshakeTimeout = 2 * time.Second
timeout = 7 * time.Second
defaultMaxAttempts = 3
)

var Client = newClient()

func newClient() *http.Client {
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.TLSHandshakeTimeout = tlsHandshakeTimeout
transport.MaxIdleConnsPerHost = MaxConcurrency
transport.DialContext = (&net.Dialer{
Timeout: dialContextTimeout,
KeepAlive: dialContextKeepAlive,
}).DialContext
return &http.Client{
Transport: WithCompression(
WithRetry(defaultMaxAttempts,
transport,
),
),
}
}
108 changes: 57 additions & 51 deletions aws/logs_monitoring_go/internal/forwarding/forwarding.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ package forwarding

import (
"bytes"
"compress/gzip"
"context"
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"time"
"sync"

"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/batching"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent"
Expand All @@ -22,96 +22,102 @@ import (
"golang.org/x/sync/errgroup"
)

const numWorkers = 3

var Client *http.Client = &http.Client{Timeout: 10 * time.Second}
const MaxConcurrency = 5

type Forwarder struct {
config *config.Config
cfg *config.Config
Comment thread
ndakkoune marked this conversation as resolved.
client *http.Client
storage string
}

func NewForwarder(cfg *config.Config, client *http.Client, storage string) Forwarder {
return Forwarder{
config: cfg,
cfg: cfg,
client: client,
storage: storage,
}
}

func (f Forwarder) Start(ctx context.Context, in <-chan model.LogEntry) error {
eg, ctx := errgroup.WithContext(ctx)

batches := make(chan []byte)
batches := make(chan []byte, MaxConcurrency)
batcher := batching.NewBatcher()
eg.Go(func() error {

producerErrCh := make(chan error, 1)
Comment thread
ndakkoune marked this conversation as resolved.
go func() {
defer close(batches)
return batcher.Batch(ctx, in, batches)
})
producerErrCh <- batcher.Batch(ctx, in, batches)
}()

var eg errgroup.Group
eg.SetLimit(MaxConcurrency)

var errs []error
var mu sync.Mutex

for {
body, ok, err := concurrent.SafeReader(ctx, batches)
if err != nil {
mu.Lock()
errs = append(errs, err)
mu.Unlock()
}
if !ok {
break
}

for range numWorkers {
eg.Go(func() error {
for {
body, ok, err := concurrent.SafeReader(ctx, batches)
if err != nil {
return err
}
if !ok {
return nil
}

if err := f.send(ctx, body); err != nil {
return err
}
if err := f.Send(ctx, body); err != nil {
mu.Lock()
errs = append(errs, err)
mu.Unlock()
}
return nil
})
}
_ = eg.Wait()

return eg.Wait()
return errors.Join(append(errs, <-producerErrCh)...)
}

// TODO: add retry mechanism for resiliency
func (f Forwarder) send(ctx context.Context, body []byte) error {
var compressedBody bytes.Buffer
zw := gzip.NewWriter(&compressedBody)
if _, err := zw.Write(body); err != nil {
return fmt.Errorf("compressing body: %w", err)
}
if err := zw.Close(); err != nil {
return fmt.Errorf("closing gzip writer: %w", err)
}
func (f Forwarder) Send(ctx context.Context, payload []byte) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

req, err := http.NewRequestWithContext(ctx, http.MethodPost, f.config.IntakeURL, bytes.NewReader(compressedBody.Bytes()))
req, err := http.NewRequestWithContext(ctx, http.MethodPost, f.cfg.IntakeURL, bytes.NewReader(payload))
if err != nil {
return err
}

req.Header.Set("DD-API-KEY", f.config.APIKey)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Content-Encoding", "gzip")
req.Header.Set("DD-API-KEY", f.cfg.APIKey)
req.Header.Set("DD-EVP-ORIGIN", "aws_forwarder")
req.Header.Set("DD-EVP-ORIGIN-VERSION", config.ForwarderVersion)
req.Header.Set("Content-Type", "application/json")
if f.storage != "" {
req.Header.Set("DD-STORAGE-TAG", f.storage)
}

resp, err := f.client.Do(req)
if err != nil {
return fmt.Errorf("sending to intake: %w", err)
return fmt.Errorf("intake: %w", err)
}
defer func() {
if _, err := io.Copy(io.Discard, resp.Body); err != nil {
slog.Warn("failed to drain response body", slog.Any("error", err))
}
if err := resp.Body.Close(); err != nil {
slog.Warn("failed to close response body", slog.Any("error", err))
}
}()
defer drainClose(resp)

if resp.StatusCode != http.StatusAccepted {
return fmt.Errorf("unexpected status from intake: %s", resp.Status)
body, _ := io.ReadAll(resp.Body)
if len(body) > 0 {
return fmt.Errorf("intake (http/%d): %s", resp.StatusCode, string(body))
}
return fmt.Errorf("intake (http/%d)", resp.StatusCode)
}

return nil
}

func drainClose(resp *http.Response) {
if _, err := io.Copy(io.Discard, resp.Body); err != nil {
slog.Warn("draining response body", slog.Any("error", err))
}
if err := resp.Body.Close(); err != nil {
slog.Warn("closing response body", slog.Any("error", err))
}
}
99 changes: 74 additions & 25 deletions aws/logs_monitoring_go/internal/forwarding/forwarding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"net/http/httptest"
"sync/atomic"
"testing"
"time"

"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model"
Expand All @@ -27,9 +28,8 @@ func TestForwarder_Start(t *testing.T) {
statusCode int
storage string
entries []model.LogEntry
cancelCtx bool
wantErr bool
wantCalls int
wantErr bool
wantCalls int
}{
"single message accepted": {
statusCode: http.StatusAccepted,
Expand All @@ -55,15 +55,7 @@ func TestForwarder_Start(t *testing.T) {
storage: cloudwatchStorage,
entries: []model.LogEntry{{Message: "test payload"}},
wantErr: true,
wantCalls: 1,
},
"context cancelled": {
statusCode: http.StatusAccepted,
storage: cloudwatchStorage,
entries: []model.LogEntry{{Message: "test payload"}},
cancelCtx: true,
wantErr: true,
wantCalls: 0,
wantCalls: defaultMaxAttempts,
},
"s3 storage": {
statusCode: http.StatusAccepted,
Expand All @@ -78,16 +70,17 @@ func TestForwarder_Start(t *testing.T) {
t.Parallel()

var callCount atomic.Int32

server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
callCount.Add(1)

assert.Equal(t, "test-api-key", req.Header.Get("DD-API-KEY"), "DD-API-KEY")
assert.Equal(t, "application/json", req.Header.Get("Content-Type"), "Content-Type")
assert.Equal(t, "gzip", req.Header.Get("Content-Encoding"), "Content-Encoding")
assert.Equal(t, "aws_forwarder", req.Header.Get("DD-EVP-ORIGIN"), "DD-EVP-ORIGIN")
assert.Equal(t, config.ForwarderVersion, req.Header.Get("DD-EVP-ORIGIN-VERSION"), "DD-EVP-ORIGIN-VERSION")
assert.Equal(t, tc.storage, req.Header.Get("DD-STORAGE-TAG"), "DD-STORAGE-TAG")
if tc.storage != "" {
assert.Equal(t, tc.storage, req.Header.Get("DD-STORAGE-TAG"), "DD-STORAGE-TAG")
}

gr, err := gzip.NewReader(req.Body)
if !assert.NoError(t, err, "body is not valid gzip") {
Expand All @@ -98,33 +91,89 @@ func TestForwarder_Start(t *testing.T) {
_, err = io.ReadAll(gr)
assert.NoError(t, err, "read gzip body")

rw.WriteHeader(tc.statusCode)
w.WriteHeader(tc.statusCode)
}))
t.Cleanup(server.Close)

f := NewForwarder(&config.Config{IntakeURL: server.URL, APIKey: "test-api-key"}, server.Client(), tc.storage)

client := server.Client()
client.Transport = WithCompression(WithRetry(defaultMaxAttempts, client.Transport))
forwarder := NewForwarder(&config.Config{IntakeURL: server.URL, APIKey: "test-api-key"}, client, tc.storage)
ctx, cancel := context.WithCancel(t.Context())
t.Cleanup(cancel)

if tc.cancelCtx {
cancel()
}

in := make(chan model.LogEntry, len(tc.entries))
for _, e := range tc.entries {
in <- e
}
close(in)

err := f.Start(ctx, in)
err := forwarder.Start(ctx, in)

assert.Equal(t, tc.wantCalls, int(callCount.Load()))
if tc.wantErr {
require.Error(t, err)
return
}
require.NoError(t, err)
assert.Equal(t, tc.wantCalls, int(callCount.Load()))
})
}
}

func TestForwarder_Start_Context(t *testing.T) {
t.Parallel()

tests := map[string]struct {
ctxBuilder func(t *testing.T) (context.Context, context.CancelFunc)
throttling time.Duration
wantErr error
}{
"pre-canceled": {
ctxBuilder: func(t *testing.T) (context.Context, context.CancelFunc) {
ctx, cancel := context.WithCancel(t.Context())
cancel()
return ctx, cancel
},
wantErr: context.Canceled,
},
"pre-timeout": {
ctxBuilder: func(t *testing.T) (context.Context, context.CancelFunc) {
return context.WithTimeout(t.Context(), -1)
},
wantErr: context.DeadlineExceeded,
},
"mid-flight timeout": {
ctxBuilder: func(t *testing.T) (context.Context, context.CancelFunc) {
return context.WithTimeout(t.Context(), 50*time.Millisecond)
},
throttling: 100 * time.Millisecond,
wantErr: context.DeadlineExceeded,
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
t.Parallel()

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
time.Sleep(tc.throttling)
}))
t.Cleanup(server.Close)
client := server.Client()
client.Transport = WithCompression(WithRetry(defaultMaxAttempts, client.Transport))
forwarder := NewForwarder(&config.Config{IntakeURL: server.URL, APIKey: "test-api-key"}, client, "")
ctx, cancel := tc.ctxBuilder(t)
t.Cleanup(cancel)

in := make(chan model.LogEntry, 1)
in <- model.LogEntry{}
close(in)

err := forwarder.Start(ctx, in)

if tc.wantErr != nil {
require.ErrorIs(t, err, tc.wantErr)
return
}
require.NoError(t, err)
})
}
}
Loading
Loading