Skip to content

Commit 0211b4d

Browse files
committed
feat: use time-unbiased algorithm for histogram reservoir
1 parent 4086701 commit 0211b4d

7 files changed

Lines changed: 269 additions & 197 deletions

File tree

sdk/metric/exemplar/benchmark_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func BenchmarkFixedSizeReservoirOffer(b *testing.B) {
2323
// number of collect calls.
2424
if i%100 == 99 {
2525
reservoir.mu.Lock()
26-
reservoir.reset()
26+
reservoir.nt.reset()
2727
reservoir.mu.Unlock()
2828
}
2929
i++

sdk/metric/exemplar/fixed_size_reservoir.go

Lines changed: 19 additions & 139 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"
55

66
import (
77
"context"
8-
"math"
9-
"math/rand/v2"
108
"sync"
119
"time"
1210

@@ -26,7 +24,10 @@ func FixedSizeReservoirProvider(k int) ReservoirProvider {
2624
// sample each one. If there are more than k, the Reservoir will then randomly
2725
// sample all additional measurement with a decreasing probability.
2826
func NewFixedSizeReservoir(k int) *FixedSizeReservoir {
29-
return newFixedSizeReservoir(newStorage(k))
27+
return &FixedSizeReservoir{
28+
nt: newNextTracker(k),
29+
storage: make([]measurement, k),
30+
}
3031
}
3132

3233
var _ Reservoir = &FixedSizeReservoir{}
@@ -37,41 +38,9 @@ var _ Reservoir = &FixedSizeReservoir{}
3738
// additional measurement with a decreasing probability.
3839
type FixedSizeReservoir struct {
3940
reservoir.ConcurrentSafe
40-
*storage
41-
mu sync.Mutex
42-
43-
// count is the number of measurement seen.
44-
count int64
45-
// next is the next count that will store a measurement at a random index
46-
// once the reservoir has been filled.
47-
next int64
48-
// w is the largest random number in a distribution that is used to compute
49-
// the next next.
50-
w float64
51-
}
52-
53-
func newFixedSizeReservoir(s *storage) *FixedSizeReservoir {
54-
r := &FixedSizeReservoir{
55-
storage: s,
56-
}
57-
r.reset()
58-
return r
59-
}
60-
61-
// randomFloat64 returns, as a float64, a uniform pseudo-random number in the
62-
// open interval (0.0,1.0).
63-
func (*FixedSizeReservoir) randomFloat64() float64 {
64-
// TODO: Use an algorithm that avoids rejection sampling. For example:
65-
//
66-
// const precision = 1 << 53 // 2^53
67-
// // Generate an integer in [1, 2^53 - 1]
68-
// v := rand.Uint64() % (precision - 1) + 1
69-
// return float64(v) / float64(precision)
70-
f := rand.Float64()
71-
for f == 0 {
72-
f = rand.Float64()
73-
}
74-
return f
41+
mu sync.Mutex
42+
storage []measurement
43+
nt *nextTracker
7544
}
7645

7746
// Offer accepts the parameters associated with a measurement. The
@@ -86,108 +55,12 @@ func (*FixedSizeReservoir) randomFloat64() float64 {
8655
// parameters are the value and dropped (filtered) attributes of the
8756
// measurement respectively.
8857
func (r *FixedSizeReservoir) Offer(ctx context.Context, t time.Time, n Value, a []attribute.KeyValue) {
89-
// The following algorithm is "Algorithm L" from Li, Kim-Hung (4 December
90-
// 1994). "Reservoir-Sampling Algorithms of Time Complexity
91-
// O(n(1+log(N/n)))". ACM Transactions on Mathematical Software. 20 (4):
92-
// 481–493 (https://dl.acm.org/doi/10.1145/198429.198435).
93-
//
94-
// A high-level overview of "Algorithm L":
95-
// 0) Pre-calculate the random count greater than the storage size when
96-
// an exemplar will be replaced.
97-
// 1) Accept all measurements offered until the configured storage size is
98-
// reached.
99-
// 2) Loop:
100-
// a) When the pre-calculate count is reached, replace a random
101-
// existing exemplar with the offered measurement.
102-
// b) Calculate the next random count greater than the existing one
103-
// which will replace another exemplars
104-
//
105-
// The way a "replacement" count is computed is by looking at `n` number of
106-
// independent random numbers each corresponding to an offered measurement.
107-
// Of these numbers the smallest `k` (the same size as the storage
108-
// capacity) of them are kept as a subset. The maximum value in this
109-
// subset, called `w` is used to weight another random number generation
110-
// for the next count that will be considered.
111-
//
112-
// By weighting the next count computation like described, it is able to
113-
// perform a uniformly-weighted sampling algorithm based on the number of
114-
// samples the reservoir has seen so far. The sampling will "slow down" as
115-
// more and more samples are offered so as to reduce a bias towards those
116-
// offered just prior to the end of the collection.
117-
//
118-
// This algorithm is preferred because of its balance of simplicity and
119-
// performance. It will compute three random numbers (the bulk of
120-
// computation time) for each item that becomes part of the reservoir, but
121-
// it does not spend any time on items that do not. In particular it has an
122-
// asymptotic runtime of O(k(1 + log(n/k)) where n is the number of
123-
// measurements offered and k is the reservoir size.
124-
//
125-
// See https://en.wikipedia.org/wiki/Reservoir_sampling for an overview of
126-
// this and other reservoir sampling algorithms. See
127-
// https://github.com/MrAlias/reservoir-sampling for a performance
128-
// comparison of reservoir sampling algorithms.
129-
13058
r.mu.Lock()
13159
defer r.mu.Unlock()
132-
if int(r.count) < cap(r.measurements) {
133-
r.store(ctx, int(r.count), t, n, a)
134-
} else if r.count == r.next {
135-
// Overwrite a random existing measurement with the one offered.
136-
idx := int(rand.Int64N(int64(cap(r.measurements))))
137-
r.store(ctx, idx, t, n, a)
138-
r.advance()
60+
sampled, idx := r.nt.shouldSample()
61+
if sampled {
62+
r.storage[idx].store(ctx, t, n, a)
13963
}
140-
r.count++
141-
}
142-
143-
// reset resets r to the initial state.
144-
func (r *FixedSizeReservoir) reset() {
145-
// This resets the number of exemplars known.
146-
r.count = 0
147-
// Random index inserts should only happen after the storage is full.
148-
r.next = int64(cap(r.measurements))
149-
150-
// Initial random number in the series used to generate r.next.
151-
//
152-
// This is set before r.advance to reset or initialize the random number
153-
// series. Without doing so it would always be 0 or never restart a new
154-
// random number series.
155-
//
156-
// This maps the uniform random number in (0,1) to a geometric distribution
157-
// over the same interval. The mean of the distribution is inversely
158-
// proportional to the storage capacity.
159-
r.w = math.Exp(math.Log(r.randomFloat64()) / float64(cap(r.measurements)))
160-
161-
r.advance()
162-
}
163-
164-
// advance updates the count at which the offered measurement will overwrite an
165-
// existing exemplar.
166-
func (r *FixedSizeReservoir) advance() {
167-
// Calculate the next value in the random number series.
168-
//
169-
// The current value of r.w is based on the max of a distribution of random
170-
// numbers (i.e. `w = max(u_1,u_2,...,u_k)` for `k` equal to the capacity
171-
// of the storage and each `u` in the interval (0,w)). To calculate the
172-
// next r.w we use the fact that when the next exemplar is selected to be
173-
// included in the storage an existing one will be dropped, and the
174-
// corresponding random number in the set used to calculate r.w will also
175-
// be replaced. The replacement random number will also be within (0,w),
176-
// therefore the next r.w will be based on the same distribution (i.e.
177-
// `max(u_1,u_2,...,u_k)`). Therefore, we can sample the next r.w by
178-
// computing the next random number `u` and take r.w as `w * u^(1/k)`.
179-
r.w *= math.Exp(math.Log(r.randomFloat64()) / float64(cap(r.measurements)))
180-
// Use the new random number in the series to calculate the count of the
181-
// next measurement that will be stored.
182-
//
183-
// Given 0 < r.w < 1, each iteration will result in subsequent r.w being
184-
// smaller. This translates here into the next next being selected against
185-
// a distribution with a higher mean (i.e. the expected value will increase
186-
// and replacements become less likely)
187-
//
188-
// Important to note, the new r.next will always be at least 1 more than
189-
// the last r.next.
190-
r.next += int64(math.Log(r.randomFloat64())/math.Log(1-r.w)) + 1
19164
}
19265

19366
// Collect returns all the held exemplars.
@@ -196,10 +69,17 @@ func (r *FixedSizeReservoir) advance() {
19669
func (r *FixedSizeReservoir) Collect(dest *[]Exemplar) {
19770
r.mu.Lock()
19871
defer r.mu.Unlock()
199-
r.storage.Collect(dest)
72+
*dest = reset(*dest, len(r.storage), len(r.storage))
73+
var n int
74+
for i := range r.storage {
75+
if r.storage[i].exemplar(&(*dest)[n]) {
76+
n++
77+
}
78+
}
79+
*dest = (*dest)[:n]
20080
// Call reset here even though it will reset r.count and restart the random
20181
// number series. This will persist any old exemplars as long as no new
20282
// measurements are offered, but it will also prioritize those new
20383
// measurements that are made over the older collection cycle ones.
204-
r.reset()
84+
r.nt.reset()
20585
}

sdk/metric/exemplar/fixed_size_reservoir_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ func TestNewFixedSizeReservoirSamplingCorrectness(t *testing.T) {
4545
}
4646

4747
var sum float64
48-
for i := range r.measurements {
49-
sum += r.measurements[i].Value.Float64()
48+
for i := range r.storage {
49+
sum += r.storage[i].Value.Float64()
5050
}
5151
mean := sum / float64(sampleSize)
5252

sdk/metric/exemplar/histogram_reservoir.go

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"slices"
99
"sort"
10+
"sync"
1011
"time"
1112

1213
"go.opentelemetry.io/otel/attribute"
@@ -22,29 +23,38 @@ func HistogramReservoirProvider(bounds []float64) ReservoirProvider {
2223
}
2324
}
2425

25-
// NewHistogramReservoir returns a [HistogramReservoir] that samples the last
26-
// measurement that falls within a histogram bucket. The histogram bucket
27-
// upper-boundaries are define by bounds.
26+
type bucket struct {
27+
mu sync.Mutex
28+
nt nextTracker
29+
measurement
30+
}
31+
32+
// NewHistogramReservoir returns a [HistogramReservoir] that samples
33+
// measurements that fall within a histogram bucket using Algorithm L. The
34+
// histogram bucket upper-boundaries are defined by bounds.
2835
//
2936
// The passed bounds must be sorted before calling this function.
3037
func NewHistogramReservoir(bounds []float64) *HistogramReservoir {
38+
buckets := make([]bucket, len(bounds)+1)
39+
for i := range buckets {
40+
buckets[i].nt = *newNextTracker(1)
41+
}
3142
return &HistogramReservoir{
3243
bounds: bounds,
33-
storage: newStorage(len(bounds) + 1),
44+
buckets: buckets,
3445
}
3546
}
3647

3748
var _ Reservoir = &HistogramReservoir{}
3849

39-
// HistogramReservoir is a [Reservoir] that samples the last measurement that
40-
// falls within a histogram bucket. The histogram bucket upper-boundaries are
41-
// define by bounds.
50+
// HistogramReservoir is a [Reservoir] that samples
51+
// measurements that fall within a histogram bucket using Algorithm L. The
52+
// histogram bucket upper-boundaries are defined by bounds.
4253
type HistogramReservoir struct {
4354
reservoir.ConcurrentSafe
44-
*storage
45-
4655
// bounds are bucket bounds in ascending order.
47-
bounds []float64
56+
bounds []float64
57+
buckets []bucket
4858
}
4959

5060
// Offer accepts the parameters associated with a measurement. The
@@ -69,14 +79,31 @@ func (r *HistogramReservoir) Offer(ctx context.Context, t time.Time, v Value, a
6979
panic("unknown value type")
7080
}
7181

72-
idx := sort.SearchFloat64s(r.bounds, n)
82+
b := &r.buckets[sort.SearchFloat64s(r.bounds, n)]
7383

74-
r.store(ctx, idx, t, v, a)
84+
b.mu.Lock()
85+
defer b.mu.Unlock()
86+
87+
sampled, _ := b.nt.shouldSample()
88+
if sampled {
89+
b.store(ctx, t, v, a)
90+
}
7591
}
7692

7793
// Collect returns all the held exemplars.
7894
//
7995
// The Reservoir state is preserved after this call.
8096
func (r *HistogramReservoir) Collect(dest *[]Exemplar) {
81-
r.storage.Collect(dest)
97+
*dest = reset(*dest, len(r.buckets), len(r.buckets))
98+
var n int
99+
for i := range r.buckets {
100+
b := &r.buckets[i]
101+
b.mu.Lock()
102+
if b.exemplar(&(*dest)[n]) {
103+
n++
104+
}
105+
b.nt.reset()
106+
b.mu.Unlock()
107+
}
108+
*dest = (*dest)[:n]
82109
}

sdk/metric/exemplar/histogram_reservoir_test.go

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,12 @@
33

44
package exemplar
55

6-
import "testing"
6+
import (
7+
"testing"
8+
9+
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
11+
)
712

813
func TestHist(t *testing.T) {
914
bounds := []float64{0, 100}
@@ -25,3 +30,35 @@ func TestHistogramReservoirConcurrentSafe(t *testing.T) {
2530
return HistogramReservoirProvider(bounds), len(bounds)
2631
}))
2732
}
33+
34+
func TestHistogramReservoirTimeUnbiased(t *testing.T) {
35+
bounds := []float64{10}
36+
r := NewHistogramReservoir(bounds)
37+
38+
const (
39+
N = 100 // Items per run
40+
M = 20000 // Number of runs
41+
)
42+
43+
var sum float64
44+
var dest []Exemplar
45+
46+
for range M {
47+
for j := 1; j <= N; j++ {
48+
val := 10.0 * float64(j) / float64(N)
49+
r.Offer(t.Context(), staticTime, NewValue(val), nil)
50+
}
51+
r.Collect(&dest)
52+
require.Len(t, dest, 1)
53+
sum += dest[0].Value.Float64()
54+
dest = dest[:0]
55+
}
56+
57+
mean := sum / float64(M)
58+
expectedMean := 5.0 * float64(N+1) / float64(N)
59+
60+
// Standard deviation of the discrete uniform distribution is approx 2.88.
61+
// Standard error of the mean for M=20000 is 2.88 / sqrt(20000) approx 0.02.
62+
// A delta of 0.1 is approx 5 standard errors, which makes flakiness extremely unlikely.
63+
assert.InDelta(t, expectedMean, mean, 0.1)
64+
}

0 commit comments

Comments
 (0)