Skip to content

Commit 112eca4

Browse files
authored
More specific pushback errors (#782)
* Add reason field to pushback errors * simplify everything * even simpler * inside * comments
1 parent 06bea73 commit 112eca4

7 files changed

Lines changed: 37 additions & 27 deletions

File tree

append_lifecycle.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"fmt"
2121
"net/http"
2222
"os"
23-
"strings"
2423
"sync"
2524
"sync/atomic"
2625
"time"
@@ -407,20 +406,23 @@ func (i *integrationStats) statsDecorator(delegate AddFn) AddFn {
407406
return func() (Index, error) {
408407
idx, err := f()
409408
attr := []attribute.KeyValue{}
410-
pushbackType := "" // This will be used for the pushback attribute below, empty string means no pushback
411409

412410
if err != nil {
413-
if errors.Is(err, ErrPushback) {
414-
// record the the fact there was pushback, and use the error string as the type.
415-
pushbackType = err.Error()
416-
} else {
417-
// Just flag that it's an errored request to avoid high cardinality of attribute values.
411+
switch {
412+
// Record the fact there was pushback, if any.
413+
case errors.Is(err, ErrPushbackAntispam):
414+
attr = append(attr, attribute.String("tessera.pushback", "antispam"))
415+
case errors.Is(err, ErrPushbackIntegration):
416+
attr = append(attr, attribute.String("tessera.pushback", "integration"))
417+
case errors.Is(err, ErrPushback):
418+
attr = append(attr, attribute.String("tessera.pushback", "other"))
419+
default:
420+
// If it's not a pushback, just flag that it's an errored request to avoid high cardinality of attribute values.
418421
// TODO(al): We might want to bucket errors into OTel status codes in the future, though.
419422
attr = append(attr, attribute.String("tessera.error.type", "_OTHER"))
420423
}
421424
}
422425

423-
attr = append(attr, attribute.String("tessera.pushback", strings.ReplaceAll(pushbackType, " ", "_")))
424426
attr = append(attr, attribute.Bool("tessera.duplicate", idx.IsDup))
425427

426428
appenderAddsTotal.Add(ctx, 1, metric.WithAttributes(attr...))

log.go

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,30 @@ package tessera
1616

1717
import (
1818
"errors"
19+
"fmt"
1920
)
2021

21-
// ErrPushback is returned by underlying storage implementations when a new entry cannot be accepted
22-
// due to overload in the system. This could be because there are too many entries with indices assigned
23-
// but which have not yet been integrated into the tree, or it could be because the antispam mechanism
24-
// is not able to keep up with recently added entries.
25-
//
26-
// Personalities encountering this error should apply back-pressure to the source of new entries
27-
// in an appropriate manner (e.g. for HTTP services, return a 503 with a Retry-After header).
28-
//
29-
// Personalities should check for this error using `errors.Is(e, ErrPushback)`.
30-
var ErrPushback = errors.New("pushback")
22+
var (
23+
// ErrPushback is returned by underlying storage implementations when a new entry cannot be accepted
24+
// due to overload in the system. This could be because there are too many entries with indices assigned
25+
// but which have not yet been integrated into the tree, or it could be because the antispam mechanism
26+
// is not able to keep up with recently added entries. It should always be wrapped with a more
27+
// specific error to provide context to clients.
28+
//
29+
// Personalities encountering this error should apply back-pressure to the source of new entries
30+
// in an appropriate manner (e.g. for HTTP services, return a 503 with a Retry-After header).
31+
//
32+
// Personalities should check for this error (wrapped or not) using `errors.Is(e, ErrPushback)`.
33+
ErrPushback = errors.New("pushback")
34+
// ErrPushbackAntispam is a wrapped ErrPushback. It is returned by underlying storage implementations
35+
// when an entry cannot be accepted becasue the antispam follower has fallen too far behind the size
36+
// of the integrated tree.
37+
ErrPushbackAntispam = fmt.Errorf("antispam %w", ErrPushback)
38+
// ErrPushbackIntegration is a wrapped ErrPushback. It is returned by underlying storage implementations
39+
// when an entry cannot be accepted becasue there are too many "in-flight" add requests - i.e. entries
40+
// with sequence numbers assigned, but which are not yet integrated into the log.
41+
ErrPushbackIntegration = fmt.Errorf("integration %w", ErrPushback)
42+
)
3143

3244
// Driver is the implementation-specific parts of Tessera. No methods are on here as this is not for public use.
3345
type Driver any

storage/aws/antispam/aws.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,6 @@ const (
4949
SchemaCompatibilityVersion = 1
5050
)
5151

52-
var errPushback = fmt.Errorf("antispam %w", tessera.ErrPushback)
53-
5452
// AntispamOpts allows configuration of some tunable options.
5553
type AntispamOpts struct {
5654
// MaxBatchSize is the largest number of mutations permitted in a single write operation when
@@ -215,7 +213,7 @@ func (d *AntispamStorage) Decorator() func(f tessera.AddFn) tessera.AddFn {
215213
//
216214
// We may decide in the future that serving duplicate reads is more important than catching up as quickly
217215
// as possible, in which case we'd move this check down below the call to index.
218-
return func() (tessera.Index, error) { return tessera.Index{}, errPushback }
216+
return func() (tessera.Index, error) { return tessera.Index{}, tessera.ErrPushbackAntispam }
219217
}
220218
idx, err := d.index(ctx, e.Identity())
221219
if err != nil {

storage/aws/aws.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1021,7 +1021,7 @@ func (s *mySQLSequencer) assignEntries(ctx context.Context, entries []*tessera.E
10211021
// Check whether there are too many outstanding entries and we should apply
10221022
// back-pressure.
10231023
if outstanding := next - treeSize; outstanding > s.maxOutstanding {
1024-
return tessera.ErrPushback
1024+
return tessera.ErrPushbackIntegration
10251025
}
10261026

10271027
sequencedEntries := make([]storage.SequencedEntry, len(entries))

storage/gcp/antispam/gcp.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,6 @@ const (
4747
DefaultPushbackThreshold = 2048
4848
)
4949

50-
var errPushback = fmt.Errorf("antispam %w", tessera.ErrPushback)
51-
5250
// AntispamOpts allows configuration of some tunable options.
5351
type AntispamOpts struct {
5452
// MaxBatchSize is the largest number of mutations permitted in a single BatchWrite operation when
@@ -168,7 +166,7 @@ func (d *AntispamStorage) Decorator() func(f tessera.AddFn) tessera.AddFn {
168166
//
169167
// We may decide in the future that serving duplicate reads is more important than catching up as quickly
170168
// as possible, in which case we'd move this check down below the call to index.
171-
return func() (tessera.Index, error) { return tessera.Index{}, errPushback }
169+
return func() (tessera.Index, error) { return tessera.Index{}, tessera.ErrPushbackAntispam }
172170
}
173171
idx, err := d.index(ctx, e.Identity())
174172
if err != nil {

storage/gcp/gcp.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -808,7 +808,7 @@ func (s *spannerCoordinator) assignEntries(ctx context.Context, entries []*tesse
808808
// Check whether there are too many outstanding entries and we should apply
809809
// back-pressure.
810810
if outstanding := next - treeSize; outstanding > int64(s.maxOutstanding) {
811-
return tessera.ErrPushback
811+
return tessera.ErrPushbackIntegration
812812
}
813813

814814
next := uint64(next) // Shadow next with a uint64 version of the same value to save on casts.

storage/posix/antispam/badger.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ func (d *AntispamStorage) Decorator() func(f tessera.AddFn) tessera.AddFn {
170170
//
171171
// We may decide in the future that serving duplicate reads is more important than catching up as quickly
172172
// as possible, in which case we'd move this check down below the call to index.
173-
return func() (tessera.Index, error) { return tessera.Index{}, tessera.ErrPushback }
173+
return func() (tessera.Index, error) { return tessera.Index{}, tessera.ErrPushbackAntispam }
174174
}
175175
idx, err := d.index(ctx, e.Identity())
176176
if err != nil {

0 commit comments

Comments
 (0)