Skip to content

Commit 7a95380

Browse files
authored
Add POSIX Antispam Metrics (#913)
* Antispam POSIX metrics * dynamically generate batchEntriesBuckets * comments about microseconds * number of nrw before success * fix names * histogram * fixes * count sucesses rather than no_rewrite * indent * 1 based
1 parent aba684b commit 7a95380

2 files changed

Lines changed: 142 additions & 8 deletions

File tree

storage/posix/antispam/badger.go

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/transparency-dev/tessera"
3434
"github.com/transparency-dev/tessera/client"
3535
"github.com/transparency-dev/tessera/internal/otel"
36+
"go.opentelemetry.io/otel/metric"
3637
"go.opentelemetry.io/otel/trace"
3738
)
3839

@@ -96,6 +97,8 @@ func NewAntispam(ctx context.Context, badgerPath string, opts AntispamOpts) (*An
9697
}
9798

9899
go func() {
100+
// runsInTick tracks the number of GC runs per tick.
101+
var runsInTick int64
99102
ticker := time.NewTicker(1 * time.Minute)
100103
defer ticker.Stop()
101104
for {
@@ -105,8 +108,24 @@ func NewAntispam(ctx context.Context, badgerPath string, opts AntispamOpts) (*An
105108
case <-ticker.C:
106109
}
107110

111+
runsInTick = 0
108112
again:
113+
start := time.Now()
114+
runsInTick++
109115
err := db.RunValueLogGC(0.7)
116+
status := "success"
117+
if err != nil {
118+
// We're done, export the number of runs we did.
119+
gcRunsPerTick.Record(ctx, runsInTick)
120+
if errors.Is(err, badger.ErrNoRewrite) {
121+
status = "no_rewrite"
122+
} else {
123+
status = "failure"
124+
}
125+
}
126+
attr := metric.WithAttributes(gcStatusKey.String(status))
127+
gcCounter.Add(ctx, 1, attr)
128+
gcDuration.Record(ctx, float64(time.Since(start).Milliseconds()), attr)
110129
if err == nil {
111130
goto again
112131
}
@@ -126,32 +145,32 @@ type AntispamStorage struct {
126145
// currently integrated tree size.
127146
// When pushBack is true, the decorator will start returning ErrPushback to all calls.
128147
pushBack atomic.Bool
129-
130-
numLookups atomic.Uint64
131-
numWrites atomic.Uint64
132-
numHits atomic.Uint64
133148
}
134149

135150
// index returns the index (if any) previously associated with the provided hash
136151
func (d *AntispamStorage) index(ctx context.Context, h []byte) (*uint64, error) {
137152
return otel.Trace(ctx, "tessera.antispam.badger.index", tracer, func(ctx context.Context, span trace.Span) (*uint64, error) {
138-
d.numLookups.Add(1)
153+
start := time.Now()
139154
var idx *uint64
155+
var hit bool
140156
err := d.db.View(func(txn *badger.Txn) error {
141157
item, err := txn.Get(h)
142158
if err == badger.ErrKeyNotFound {
143159
span.AddEvent("tessera.miss")
144160
return nil
145161
}
146162
span.AddEvent("tessera.hit")
147-
d.numHits.Add(1)
163+
hit = true
148164

149165
return item.Value(func(v []byte) error {
150166
i := binary.BigEndian.Uint64(v)
151167
idx = &i
152168
return nil
153169
})
154170
})
171+
// Microseconds / 1000.0 and not milliseconds to record sub-millisecond durations.
172+
lookupDuration.Record(ctx, float64(time.Since(start).Microseconds())/1000.0, metric.WithAttributes(hitKey.Bool(hit)))
173+
lookupCounter.Add(ctx, 1, metric.WithAttributes(hitKey.Bool(hit)))
155174
return idx, err
156175
})
157176
}
@@ -240,7 +259,8 @@ func (f *follower) Follow(ctx context.Context, lr tessera.LogReader) {
240259
for moreWork := true; moreWork; {
241260

242261
err := f.as.db.Update(func(txn *badger.Txn) error {
243-
return otel.TraceErr(ctx, "tessera.antispam.badger.FollowTxn", tracer, func(ctx context.Context, span trace.Span) error {
262+
return otel.TraceErr(ctx, "tessera.antispam.badger.follow_txn", tracer, func(ctx context.Context, span trace.Span) error {
263+
batchStart := time.Now()
244264
ctx, cancel := context.WithTimeout(ctx, defaultBatchTimeout)
245265
defer cancel()
246266

@@ -352,7 +372,6 @@ func (f *follower) Follow(ctx context.Context, lr tessera.LogReader) {
352372
}
353373

354374
numAdded := uint64(len(curEntries))
355-
f.as.numWrites.Add(numAdded)
356375

357376
// and update the follower state
358377
b := make([]byte, 8)
@@ -361,6 +380,10 @@ func (f *follower) Follow(ctx context.Context, lr tessera.LogReader) {
361380
return fmt.Errorf("failed to update follower state: %v", err)
362381
}
363382

383+
// Microseconds / 1000.0 and not milliseconds to record sub-millisecond durations.
384+
followTxnDuration.Record(ctx, float64(time.Since(batchStart).Microseconds())/1000.0)
385+
followTxnEntriesCounter.Record(ctx, int64(numAdded))
386+
364387
return ctx.Err()
365388
})
366389
})

storage/posix/antispam/otel.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,128 @@
1515
package badger
1616

1717
import (
18+
"context"
19+
"log/slog"
20+
"os"
21+
1822
"go.opentelemetry.io/otel"
1923
"go.opentelemetry.io/otel/attribute"
24+
"go.opentelemetry.io/otel/metric"
2025
)
2126

2227
const name = "github.com/transparency-dev/tessera/storage/posix/antispam"
2328

2429
var (
2530
tracer = otel.Tracer(name)
31+
meter = otel.Meter(name)
2632
)
2733

2834
var (
2935
followFromKey = attribute.Key("tessera.followFrom")
3036
pushbackKey = attribute.Key("tessera.pushback")
37+
gcStatusKey = attribute.Key("status")
38+
hitKey = attribute.Key("hit")
39+
)
40+
41+
var (
42+
gcCounter metric.Int64Counter
43+
gcRunsPerTick metric.Int64Histogram
44+
gcDuration metric.Float64Histogram
45+
lookupCounter metric.Int64Counter
46+
lookupDuration metric.Float64Histogram
47+
followTxnEntriesCounter metric.Int64Histogram
48+
followTxnDuration metric.Float64Histogram
49+
50+
// Histogram buckets for operations (latency in ms)
51+
histogramBuckets = []float64{0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10, 20, 50, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 1200, 1400, 1600, 1800, 2000, 2500, 3000, 4000, 5000, 6000, 8000, 10000}
52+
53+
// GC buckets for longer running operations.
54+
gcDurationBuckets = []float64{10, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 30000, 60000}
3155
)
56+
57+
func generateBatchBuckets(max int) []float64 {
58+
bases := []float64{1, 2, 5}
59+
buckets := []float64{}
60+
for multiplier := 1.0; ; multiplier *= 10 {
61+
for _, b := range bases {
62+
v := b * multiplier
63+
if v >= float64(max) {
64+
buckets = append(buckets, float64(max))
65+
return buckets
66+
}
67+
buckets = append(buckets, v)
68+
}
69+
}
70+
}
71+
72+
func init() {
73+
var err error
74+
75+
gcCounter, err = meter.Int64Counter(
76+
"tessera.antispam.badger.gc.count",
77+
metric.WithDescription("Number of BadgerDB Garbage Collection runs"),
78+
metric.WithUnit("{run}"))
79+
if err != nil {
80+
slog.ErrorContext(context.Background(), "Failed to create tessera.antispam.badger.gc.count metric", slog.Any("error", err))
81+
os.Exit(1)
82+
}
83+
84+
gcRunsPerTick, err = meter.Int64Histogram(
85+
"tessera.antispam.badger.gc.runs_per_tick",
86+
metric.WithDescription("Number of GC runs per tick"),
87+
metric.WithUnit("{run}"),
88+
metric.WithExplicitBucketBoundaries(generateBatchBuckets(500)...))
89+
if err != nil {
90+
slog.ErrorContext(context.Background(), "Failed to create tessera.antispam.badger.gc.runs_per_tick metric", slog.Any("error", err))
91+
os.Exit(1)
92+
}
93+
94+
gcDuration, err = meter.Float64Histogram(
95+
"tessera.antispam.badger.gc.duration",
96+
metric.WithDescription("Duration of BadgerDB Garbage Collection runs"),
97+
metric.WithUnit("ms"),
98+
metric.WithExplicitBucketBoundaries(gcDurationBuckets...))
99+
if err != nil {
100+
slog.ErrorContext(context.Background(), "Failed to create tessera.antispam.badger.gc.duration metric", slog.Any("error", err))
101+
os.Exit(1)
102+
}
103+
104+
lookupCounter, err = meter.Int64Counter(
105+
"tessera.antispam.badger.index.count",
106+
metric.WithDescription("Number of BadgerDB identity lookups"),
107+
metric.WithUnit("{lookup}"))
108+
if err != nil {
109+
slog.ErrorContext(context.Background(), "Failed to create tessera.antispam.badger.index.count metric", slog.Any("error", err))
110+
os.Exit(1)
111+
}
112+
113+
lookupDuration, err = meter.Float64Histogram(
114+
"tessera.antispam.badger.index.duration",
115+
metric.WithDescription("Duration of BadgerDB identity lookups"),
116+
metric.WithUnit("ms"),
117+
metric.WithExplicitBucketBoundaries(histogramBuckets...))
118+
if err != nil {
119+
slog.ErrorContext(context.Background(), "Failed to create tessera.antispam.badger.index.duration metric", slog.Any("error", err))
120+
os.Exit(1)
121+
}
122+
123+
followTxnEntriesCounter, err = meter.Int64Histogram(
124+
"tessera.antispam.badger.follow_txn.entries.count",
125+
metric.WithDescription("Number of entries processed by BadgerDB antispam Follow transactions"),
126+
metric.WithUnit("{entry}"),
127+
metric.WithExplicitBucketBoundaries(generateBatchBuckets(int(DefaultMaxBatchSize))...))
128+
if err != nil {
129+
slog.ErrorContext(context.Background(), "Failed to create tessera.antispam.badger.follow_txn.entries.count metric", slog.Any("error", err))
130+
os.Exit(1)
131+
}
132+
133+
followTxnDuration, err = meter.Float64Histogram(
134+
"tessera.antispam.badger.FollowTxn.duration",
135+
metric.WithDescription("Duration of BadgerDB Follow transactions"),
136+
metric.WithUnit("ms"),
137+
metric.WithExplicitBucketBoundaries(histogramBuckets...))
138+
if err != nil {
139+
slog.ErrorContext(context.Background(), "Failed to create tessera.antispam.badger.FollowTxn.duration metric", slog.Any("error", err))
140+
os.Exit(1)
141+
}
142+
}

0 commit comments

Comments
 (0)