Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions aws/logs_monitoring_go/internal/client/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/s3"
)

// See https://github.com/aws/aws-sdk-go-v2/issues/3416 for future configuration
const timeout = 10 * time.Second

var (
Expand Down
39 changes: 39 additions & 0 deletions aws/logs_monitoring_go/internal/forwarding/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2026-Present Datadog, Inc.

package forwarding

import (
"net"
"net/http"
"time"
)

const (
dialContextTimeout = 1 * time.Second
dialContextKeepAlive = 60 * time.Second
tlsHandshakeTimeout = 2 * time.Second
timeout = 7 * time.Second
defaultMaxAttempts = 3
)

var Client = newClient()

func newClient() *http.Client {
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.TLSHandshakeTimeout = tlsHandshakeTimeout
transport.MaxIdleConnsPerHost = maxConcurrency
transport.DialContext = (&net.Dialer{
Timeout: dialContextTimeout,
KeepAlive: dialContextKeepAlive,
}).DialContext
return &http.Client{
Transport: WithCompression(
WithRetry(defaultMaxAttempts,
transport,
),
),
}
}
104 changes: 53 additions & 51 deletions aws/logs_monitoring_go/internal/forwarding/forwarding.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ package forwarding

import (
"bytes"
"compress/gzip"
"context"
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"time"
"sync"

"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/batching"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent"
Expand All @@ -22,96 +22,98 @@ import (
"golang.org/x/sync/errgroup"
)

const numWorkers = 3

var Client *http.Client = &http.Client{Timeout: 10 * time.Second}
const maxConcurrency = 5

type Forwarder struct {
config *config.Config
cfg *config.Config
Comment thread
ndakkoune marked this conversation as resolved.
client *http.Client
storage string
}

func NewForwarder(cfg *config.Config, client *http.Client, storage string) Forwarder {
return Forwarder{
config: cfg,
cfg: cfg,
client: client,
storage: storage,
}
}

func (f Forwarder) Start(ctx context.Context, in <-chan model.LogEntry) error {
eg, ctx := errgroup.WithContext(ctx)

batches := make(chan []byte)
batches := make(chan []byte, maxConcurrency)
batcher := batching.NewBatcher()
eg.Go(func() error {

producerErrCh := make(chan error, 1)
Comment thread
ndakkoune marked this conversation as resolved.
go func() {
defer close(batches)
return batcher.Batch(ctx, in, batches)
})
producerErrCh <- batcher.Batch(ctx, in, batches)
}()

var eg errgroup.Group
eg.SetLimit(maxConcurrency)

var errs []error
var mu sync.Mutex

for {
body, ok, err := concurrent.SafeReader(ctx, batches)
if err != nil {
mu.Lock()
errs = append(errs, err)
mu.Unlock()
}
if !ok {
break
}

for range numWorkers {
eg.Go(func() error {
for {
body, ok, err := concurrent.SafeReader(ctx, batches)
if err != nil {
return err
}
if !ok {
return nil
}

if err := f.send(ctx, body); err != nil {
return err
}
if err := f.send(ctx, body); err != nil {
mu.Lock()
errs = append(errs, err)
mu.Unlock()
}
return nil
})
}
_ = eg.Wait()

return eg.Wait()
return errors.Join(append(errs, <-producerErrCh)...)
}

// TODO: add retry mechanism for resiliency
func (f Forwarder) send(ctx context.Context, body []byte) error {
var compressedBody bytes.Buffer
zw := gzip.NewWriter(&compressedBody)
if _, err := zw.Write(body); err != nil {
return fmt.Errorf("compressing body: %w", err)
}
if err := zw.Close(); err != nil {
return fmt.Errorf("closing gzip writer: %w", err)
}
func (f Forwarder) send(ctx context.Context, payload []byte) error {
reqCtx, cancel := context.WithTimeout(ctx, timeout)
Comment thread
ndakkoune marked this conversation as resolved.
Outdated
defer cancel()

req, err := http.NewRequestWithContext(ctx, http.MethodPost, f.config.IntakeURL, bytes.NewReader(compressedBody.Bytes()))
req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, f.cfg.IntakeURL, bytes.NewReader(payload))
if err != nil {
return err
}

req.Header.Set("DD-API-KEY", f.config.APIKey)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Content-Encoding", "gzip")
req.Header.Set("DD-API-KEY", f.cfg.APIKey)
req.Header.Set("DD-EVP-ORIGIN", "aws_forwarder")
req.Header.Set("DD-EVP-ORIGIN-VERSION", config.ForwarderVersion)
req.Header.Set("Content-Type", "application/json")
if f.storage != "" {
req.Header.Set("DD-STORAGE-TAG", f.storage)
}

resp, err := f.client.Do(req)
if err != nil {
return fmt.Errorf("sending to intake: %w", err)
return fmt.Errorf("intake: %w", err)
}
defer func() {
if _, err := io.Copy(io.Discard, resp.Body); err != nil {
slog.Warn("failed to drain response body", slog.Any("error", err))
}
if err := resp.Body.Close(); err != nil {
slog.Warn("failed to close response body", slog.Any("error", err))
}
}()
defer drainClose(resp)

if resp.StatusCode != http.StatusAccepted {
return fmt.Errorf("unexpected status from intake: %s", resp.Status)
return fmt.Errorf("intake: %s", resp.Status)
Comment thread
ndakkoune marked this conversation as resolved.
Outdated
}

return nil
}

func drainClose(resp *http.Response) {
if _, err := io.Copy(io.Discard, resp.Body); err != nil {
slog.Warn("draining response body", slog.Any("error", err))
}
if err := resp.Body.Close(); err != nil {
slog.Warn("closing response body", slog.Any("error", err))
}
}
99 changes: 74 additions & 25 deletions aws/logs_monitoring_go/internal/forwarding/forwarding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"net/http/httptest"
"sync/atomic"
"testing"
"time"

"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config"
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model"
Expand All @@ -27,9 +28,8 @@ func TestForwarder_Start(t *testing.T) {
statusCode int
storage string
entries []model.LogEntry
cancelCtx bool
wantErr bool
wantCalls int
wantErr bool
wantCalls int
}{
"single message accepted": {
statusCode: http.StatusAccepted,
Expand All @@ -55,15 +55,7 @@ func TestForwarder_Start(t *testing.T) {
storage: cloudwatchStorage,
entries: []model.LogEntry{{Message: "test payload"}},
wantErr: true,
wantCalls: 1,
},
"context cancelled": {
statusCode: http.StatusAccepted,
storage: cloudwatchStorage,
entries: []model.LogEntry{{Message: "test payload"}},
cancelCtx: true,
wantErr: true,
wantCalls: 0,
wantCalls: defaultMaxAttempts,
},
"s3 storage": {
statusCode: http.StatusAccepted,
Expand All @@ -78,16 +70,17 @@ func TestForwarder_Start(t *testing.T) {
t.Parallel()

var callCount atomic.Int32

server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
callCount.Add(1)

assert.Equal(t, "test-api-key", req.Header.Get("DD-API-KEY"), "DD-API-KEY")
assert.Equal(t, "application/json", req.Header.Get("Content-Type"), "Content-Type")
assert.Equal(t, "gzip", req.Header.Get("Content-Encoding"), "Content-Encoding")
assert.Equal(t, "aws_forwarder", req.Header.Get("DD-EVP-ORIGIN"), "DD-EVP-ORIGIN")
assert.Equal(t, config.ForwarderVersion, req.Header.Get("DD-EVP-ORIGIN-VERSION"), "DD-EVP-ORIGIN-VERSION")
assert.Equal(t, tc.storage, req.Header.Get("DD-STORAGE-TAG"), "DD-STORAGE-TAG")
if tc.storage != "" {
assert.Equal(t, tc.storage, req.Header.Get("DD-STORAGE-TAG"), "DD-STORAGE-TAG")
}

gr, err := gzip.NewReader(req.Body)
if !assert.NoError(t, err, "body is not valid gzip") {
Expand All @@ -98,33 +91,89 @@ func TestForwarder_Start(t *testing.T) {
_, err = io.ReadAll(gr)
assert.NoError(t, err, "read gzip body")

rw.WriteHeader(tc.statusCode)
w.WriteHeader(tc.statusCode)
}))
t.Cleanup(server.Close)

f := NewForwarder(&config.Config{IntakeURL: server.URL, APIKey: "test-api-key"}, server.Client(), tc.storage)

client := server.Client()
client.Transport = WithCompression(WithRetry(defaultMaxAttempts, client.Transport))
forwarder := NewForwarder(&config.Config{IntakeURL: server.URL, APIKey: "test-api-key"}, client, tc.storage)
ctx, cancel := context.WithCancel(t.Context())
t.Cleanup(cancel)

if tc.cancelCtx {
cancel()
}

in := make(chan model.LogEntry, len(tc.entries))
for _, e := range tc.entries {
in <- e
}
close(in)

err := f.Start(ctx, in)
err := forwarder.Start(ctx, in)

assert.Equal(t, tc.wantCalls, int(callCount.Load()))
if tc.wantErr {
require.Error(t, err)
return
}
require.NoError(t, err)
assert.Equal(t, tc.wantCalls, int(callCount.Load()))
})
}
}

func TestForwarder_Start_Context(t *testing.T) {
t.Parallel()

tests := map[string]struct {
ctxBuilder func(t *testing.T) (context.Context, context.CancelFunc)
throttling time.Duration
wantErr error
}{
"pre-canceled": {
ctxBuilder: func(t *testing.T) (context.Context, context.CancelFunc) {
ctx, cancel := context.WithCancel(t.Context())
cancel()
return ctx, cancel
},
wantErr: context.Canceled,
},
"pre-timeout": {
ctxBuilder: func(t *testing.T) (context.Context, context.CancelFunc) {
return context.WithTimeout(t.Context(), -1)
},
wantErr: context.DeadlineExceeded,
},
"mid-flight timeout": {
ctxBuilder: func(t *testing.T) (context.Context, context.CancelFunc) {
return context.WithTimeout(t.Context(), 50*time.Millisecond)
},
throttling: 100 * time.Millisecond,
wantErr: context.DeadlineExceeded,
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
t.Parallel()

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
time.Sleep(tc.throttling)
}))
t.Cleanup(server.Close)
client := server.Client()
client.Transport = WithCompression(WithRetry(defaultMaxAttempts, client.Transport))
forwarder := NewForwarder(&config.Config{IntakeURL: server.URL, APIKey: "test-api-key"}, client, "")
ctx, cancel := tc.ctxBuilder(t)
t.Cleanup(cancel)

in := make(chan model.LogEntry, 1)
in <- model.LogEntry{}
close(in)

err := forwarder.Start(ctx, in)

if tc.wantErr != nil {
require.ErrorIs(t, err, tc.wantErr)
return
}
require.NoError(t, err)
})
}
}
Loading
Loading