Skip to content

Commit 8b93f8d

Browse files
committed
add Permanent Errors
1 parent c6f2b8c commit 8b93f8d

3 files changed

Lines changed: 31 additions & 6 deletions

File tree

aws/logs_monitoring_go/internal/forwarding/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ var Client = newClient()
2424
func newClient() *http.Client {
2525
transport := http.DefaultTransport.(*http.Transport).Clone()
2626
transport.TLSHandshakeTimeout = tlsHandshakeTimeout
27-
transport.MaxIdleConnsPerHost = maxConcurrency
27+
transport.MaxIdleConnsPerHost = MaxConcurrency
2828
transport.DialContext = (&net.Dialer{
2929
Timeout: dialContextTimeout,
3030
KeepAlive: dialContextKeepAlive,

aws/logs_monitoring_go/internal/forwarding/forwarding.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222
"golang.org/x/sync/errgroup"
2323
)
2424

25-
const maxConcurrency = 5
25+
const MaxConcurrency = 5
2626

2727
type Forwarder struct {
2828
cfg *config.Config
@@ -39,7 +39,7 @@ func NewForwarder(cfg *config.Config, client *http.Client, storage string) Forwa
3939
}
4040

4141
func (f Forwarder) Start(ctx context.Context, in <-chan model.LogEntry) error {
42-
batches := make(chan []byte, maxConcurrency)
42+
batches := make(chan []byte, MaxConcurrency)
4343
batcher := batching.NewBatcher()
4444

4545
producerErrCh := make(chan error, 1)
@@ -49,7 +49,7 @@ func (f Forwarder) Start(ctx context.Context, in <-chan model.LogEntry) error {
4949
}()
5050

5151
var eg errgroup.Group
52-
eg.SetLimit(maxConcurrency)
52+
eg.SetLimit(MaxConcurrency)
5353

5454
var errs []error
5555
var mu sync.Mutex
@@ -66,7 +66,10 @@ func (f Forwarder) Start(ctx context.Context, in <-chan model.LogEntry) error {
6666
}
6767

6868
eg.Go(func() error {
69-
if err := f.send(ctx, body); err != nil {
69+
if err := f.Send(ctx, body); err != nil {
70+
if f.storage != nil {
71+
err = f.storage.Put(ctx, body, f.storageTag)
72+
}
7073
mu.Lock()
7174
errs = append(errs, err)
7275
mu.Unlock()
@@ -79,7 +82,7 @@ func (f Forwarder) Start(ctx context.Context, in <-chan model.LogEntry) error {
7982
return errors.Join(append(errs, <-producerErrCh)...)
8083
}
8184

82-
func (f Forwarder) send(ctx context.Context, payload []byte) error {
85+
func (f Forwarder) Send(ctx context.Context, payload []byte) error {
8386
ctx, cancel := context.WithTimeout(ctx, timeout)
8487
defer cancel()
8588

aws/logs_monitoring_go/internal/forwarding/middleware.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,16 @@ func WithRetry(maxAttempts int, next http.RoundTripper) RoundTripperFunc {
8787

8888
if !isRetryable(resp.StatusCode) {
8989
slog.LogAttrs(req.Context(), slog.LevelDebug, "request complete", attrs...)
90+
if isPermanent(resp.StatusCode) {
91+
body, err := io.ReadAll(resp.Body)
92+
if err != nil {
93+
slog.Warn("reading body response", slog.Any("error", err))
94+
}
95+
return nil, &PermanentError{
96+
StatusCode: resp.StatusCode,
97+
Reason: string(body),
98+
}
99+
}
90100
return resp, nil
91101
}
92102

@@ -102,6 +112,18 @@ func WithRetry(maxAttempts int, next http.RoundTripper) RoundTripperFunc {
102112
}
103113
}
104114

115+
func isPermanent(statusCode int) bool {
116+
switch statusCode {
117+
case http.StatusBadRequest,
118+
http.StatusUnauthorized,
119+
http.StatusForbidden,
120+
http.StatusRequestEntityTooLarge:
121+
return true
122+
default:
123+
return statusCode >= 400
124+
}
125+
}
126+
105127
func isRetryable(statusCode int) bool {
106128
switch statusCode {
107129
case http.StatusTooManyRequests,

0 commit comments

Comments
 (0)