Skip to content

Commit babe558

Browse files
authored
Batcher refactor and performance observability (#2310)
* Remove incorrectly used sandbox context from feature flag resolve * Batcher refactor for easier read, performance metrics Batcher loop is now easier to navigate. Time waits replaced with ticker. Metrics for performance monitoring are now included. Different batchers can be run in parallel with name attribute used to distinguish. * Use batcher naming * Trace batch callback for speed * Allow export of `batcher` metrics * Fix Push/Stop race condition and stale doc comment in batcher Guard Start, Stop, and Push with a sync.RWMutex to prevent a send-on-closed-channel panic that could occur when Stop closed b.ch between Push's started check and the channel send. Also update the QueueSize doc comment to reflect that Push now returns ErrBatcherQueueFull instead of the old (false, nil). * Reset ticker after each flush for consistent MaxDelay window Without reset, a size-triggered flush near a tick boundary would give the next batch less than MaxDelay to accumulate. Now each batch always gets a full MaxDelay window after the previous flush.
1 parent 680d368 commit babe558

File tree

7 files changed

+180
-198
lines changed

7 files changed

+180
-198
lines changed

iac/modules/job-otel-collector/configs/otel-collector.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ processors:
217217
- "orchestrator.*"
218218
- "template.*"
219219
- "api.*"
220+
- "batcher.*"
220221
- "db.sql.connection.*"
221222
- "db.client.*"
222223
- "vault.*"

packages/clickhouse/go.mod

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ require (
1010
github.com/ClickHouse/clickhouse-go/v2 v2.40.1
1111
github.com/e2b-dev/infra/packages/shared v0.0.0
1212
github.com/google/uuid v1.6.0
13+
go.opentelemetry.io/otel v1.41.0
14+
go.opentelemetry.io/otel/metric v1.41.0
15+
go.opentelemetry.io/otel/trace v1.41.0
1316
go.uber.org/zap v1.27.1
1417
)
1518

@@ -77,17 +80,14 @@ require (
7780
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
7881
go.opentelemetry.io/contrib/bridges/otelzap v0.14.0 // indirect
7982
go.opentelemetry.io/contrib/instrumentation/runtime v0.66.0 // indirect
80-
go.opentelemetry.io/otel v1.41.0 // indirect
8183
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.15.0 // indirect
8284
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.39.0 // indirect
8385
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0 // indirect
8486
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.39.0 // indirect
8587
go.opentelemetry.io/otel/log v0.15.0 // indirect
86-
go.opentelemetry.io/otel/metric v1.41.0 // indirect
8788
go.opentelemetry.io/otel/sdk v1.41.0 // indirect
8889
go.opentelemetry.io/otel/sdk/log v0.15.0 // indirect
8990
go.opentelemetry.io/otel/sdk/metric v1.41.0 // indirect
90-
go.opentelemetry.io/otel/trace v1.41.0 // indirect
9191
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
9292
go.uber.org/mock v0.5.2 // indirect
9393
go.uber.org/multierr v1.11.0 // indirect

packages/clickhouse/pkg/batcher/batcher.go

Lines changed: 109 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,14 @@ package batcher
33
import (
44
"context"
55
"errors"
6+
"sync"
67
"time"
8+
9+
"go.opentelemetry.io/otel"
10+
"go.opentelemetry.io/otel/attribute"
11+
"go.opentelemetry.io/otel/metric"
12+
13+
"github.com/e2b-dev/infra/packages/shared/pkg/utils"
714
)
815

916
const (
@@ -19,6 +26,16 @@ var (
1926
ErrBatcherQueueFull = errors.New("batcher queue is full")
2027
)
2128

29+
var (
30+
meter = otel.Meter("github.com/e2b-dev/infra/packages/clickhouse/pkg/batcher")
31+
32+
mItemsDropped = utils.Must(meter.Int64Counter("batcher.items.dropped", metric.WithDescription("Number of items dropped because the batcher queue was full"), metric.WithUnit("{item}")))
33+
mQueueLen = utils.Must(meter.Int64Gauge("batcher.queue.length", metric.WithDescription("Current number of items waiting in the batcher queue"), metric.WithUnit("{item}")))
34+
mFlushBatchSize = utils.Must(meter.Int64Histogram("batcher.flush.batch_size", metric.WithDescription("Number of items per flushed batch"), metric.WithUnit("{item}")))
35+
mFlushWaitDuration = utils.Must(meter.Int64Histogram("batcher.flush.wait_duration", metric.WithDescription("Time from first item enqueued in a batch to when the batch is flushed"), metric.WithUnit("ms")))
36+
mFlushDuration = utils.Must(meter.Int64Histogram("batcher.flush.duration", metric.WithDescription("Time spent executing BatcherFunc per flush"), metric.WithUnit("ms")))
37+
)
38+
2239
// Batcher groups items in batches and calls Func on them.
2340
//
2441
// See also BytesBatcher.
@@ -29,20 +46,24 @@ type Batcher[T any] struct {
2946
// Maximum batch size that will be passed to BatcherFunc.
3047
MaxBatchSize int
3148

32-
// Maximum delay between Push() and BatcherFunc call.
49+
// Maximum delay between the first item being enqueued in a batch and the
50+
// BatcherFunc call for that batch.
3351
MaxDelay time.Duration
3452

3553
// Maximum unprocessed items' queue size.
3654
QueueSize int
3755

38-
// ErrorHandler is called when BatcherFunc returns an error
39-
// If not set, errors from BatcherFunc will be silently dropped
40-
// This allows customizing error handling behavior - e.g. logging, metrics, etc.
56+
// ErrorHandler is called when BatcherFunc returns an error.
57+
// If not set, errors from BatcherFunc will be silently dropped.
4158
ErrorHandler func(error)
4259

4360
// Synchronization primitives.
44-
ch chan T
45-
doneCh chan struct{}
61+
mu sync.RWMutex
62+
ch chan T
63+
doneCh chan struct{}
64+
started bool
65+
66+
attrs metric.MeasurementOption
4667
}
4768

4869
// BatcherFunc is called by Batcher when batch is ready to be processed.
@@ -52,21 +73,24 @@ type Batcher[T any] struct {
5273
type BatcherFunc[T any] func(ctx context.Context, batch []T) error
5374

5475
type BatcherOptions struct {
55-
// MaxBatchSize is the maximum number of items that will be collected into a single batch
56-
// before being flushed by the BatcherFunc
76+
// Name is added as the "batcher" attribute on all metrics, allowing different
77+
// batcher instances to be identified in dashboards (e.g. "sandbox-events", "billing-export").
78+
Name string
79+
80+
// MaxBatchSize is the maximum number of items collected into a single batch
81+
// before being flushed by the BatcherFunc.
5782
MaxBatchSize int
5883

59-
// MaxDelay is the maximum time to wait for a batch to fill up before flushing it,
60-
// even if the batch size hasn't reached MaxBatchSize
84+
// MaxDelay is the maximum time between the first item being enqueued in a
85+
// batch and the BatcherFunc call for that batch.
6186
MaxDelay time.Duration
6287

63-
// QueueSize is the size of the channel buffer used to queue incoming items
64-
// If the queue is full, new items will be rejected
88+
// QueueSize is the size of the channel buffer used to queue incoming items.
89+
// Items pushed when the queue is full are dropped (Push returns ErrBatcherQueueFull).
6590
QueueSize int
6691

67-
// ErrorHandler is called when BatcherFunc returns an error
68-
// If not set, errors from BatcherFunc will be silently dropped
69-
// This allows customizing error handling behavior - e.g. logging, metrics, etc.
92+
// ErrorHandler is called when BatcherFunc returns an error.
93+
// If not set, errors from BatcherFunc will be silently dropped.
7094
ErrorHandler func(error)
7195
}
7296

@@ -78,16 +102,16 @@ func NewBatcher[T any](fn BatcherFunc[T], cfg BatcherOptions) (*Batcher[T], erro
78102
MaxDelay: cfg.MaxDelay,
79103
QueueSize: cfg.QueueSize,
80104
ErrorHandler: cfg.ErrorHandler,
105+
106+
attrs: metric.WithAttributeSet(attribute.NewSet(attribute.String("batcher", cfg.Name))),
81107
}
82108

83109
if b.Func == nil {
84110
return nil, ErrFuncNotSet
85111
}
86-
87112
if b.ErrorHandler == nil {
88113
b.ErrorHandler = func(error) {}
89114
}
90-
91115
if b.MaxBatchSize <= 0 {
92116
b.MaxBatchSize = defaultMaxBatchSize
93117
}
@@ -101,117 +125,119 @@ func NewBatcher[T any](fn BatcherFunc[T], cfg BatcherOptions) (*Batcher[T], erro
101125
return b, nil
102126
}
103127

104-
// Initialize the synchronization primitives and start batch processing.
128+
// Start begins batch processing.
105129
func (b *Batcher[T]) Start(ctx context.Context) error {
106-
if b.ch != nil {
130+
b.mu.Lock()
131+
defer b.mu.Unlock()
132+
133+
if b.started {
107134
return ErrBatcherAlreadyStarted
108135
}
109136

110137
b.ch = make(chan T, b.QueueSize)
111138
b.doneCh = make(chan struct{})
139+
b.started = true
112140

113141
go func() {
114-
processBatches(ctx, b.Func, b.ch, b.MaxBatchSize, b.MaxDelay, b.ErrorHandler)
142+
b.processBatches(ctx)
115143
close(b.doneCh)
116144
}()
117145

118146
return nil
119147
}
120148

121-
// Stop stops batch processing.
149+
// Stop stops batch processing and flushes remaining items.
122150
func (b *Batcher[T]) Stop() error {
123-
if b.ch == nil {
151+
b.mu.Lock()
152+
if !b.started {
153+
b.mu.Unlock()
154+
124155
return ErrBatcherNotStarted
125156
}
157+
158+
b.started = false
126159
close(b.ch)
160+
b.mu.Unlock()
161+
127162
<-b.doneCh
128-
b.ch = nil
129-
b.doneCh = nil
130163

131164
return nil
132165
}
133166

134-
// Push pushes new batched item into the batcher.
135-
//
136-
// Don't forget calling Start() before pushing items into the batcher.
137-
func (b *Batcher[T]) Push(batchedItem T) (bool, error) {
138-
if b.ch == nil {
139-
return false, ErrBatcherNotStarted
167+
// Push enqueues an item for batching.
168+
// Returns ErrBatcherQueueFull immediately if the queue is full.
169+
func (b *Batcher[T]) Push(item T) error {
170+
b.mu.RLock()
171+
defer b.mu.RUnlock()
172+
173+
if !b.started {
174+
return ErrBatcherNotStarted
140175
}
176+
141177
select {
142-
case b.ch <- batchedItem:
143-
return true, nil
178+
case b.ch <- item:
179+
mQueueLen.Record(context.Background(), int64(len(b.ch)), b.attrs)
180+
181+
return nil
144182
default:
145-
return false, nil
183+
mItemsDropped.Add(context.Background(), 1, b.attrs)
184+
185+
return ErrBatcherQueueFull
146186
}
147187
}
148188

149-
// QueueLen returns the number of pending items, which weren't passed into
150-
// BatcherFunc yet.
151-
//
152-
// Maximum number of pending items is Batcher.QueueSize.
189+
// QueueLen returns the number of items pending in the queue.
153190
func (b *Batcher[T]) QueueLen() int {
154191
return len(b.ch)
155192
}
156193

157-
func processBatches[T any](ctx context.Context, f BatcherFunc[T], ch <-chan T, maxBatchedItemBatchSize int, maxBatchedItemDelay time.Duration, errorHandler func(error)) {
194+
func (b *Batcher[T]) processBatches(ctx context.Context) {
158195
var (
159-
batch []T
160-
batchedItem T
161-
ok bool
162-
lastPushTime = time.Now()
196+
batch []T
197+
batchStartTime time.Time
163198
)
164199

200+
ticker := time.NewTicker(b.MaxDelay)
201+
defer ticker.Stop()
202+
203+
flush := func() {
204+
if len(batch) == 0 {
205+
return
206+
}
207+
208+
mFlushWaitDuration.Record(ctx, time.Since(batchStartTime).Milliseconds(), b.attrs)
209+
start := time.Now()
210+
if err := b.Func(ctx, batch); err != nil {
211+
b.ErrorHandler(err)
212+
}
213+
214+
mFlushBatchSize.Record(ctx, int64(len(batch)), b.attrs)
215+
mFlushDuration.Record(ctx, time.Since(start).Milliseconds(), b.attrs)
216+
mQueueLen.Record(ctx, int64(len(b.ch)), b.attrs)
217+
218+
batch = batch[:0]
219+
ticker.Reset(b.MaxDelay)
220+
}
221+
165222
for {
166223
select {
167-
case batchedItem, ok = <-ch:
224+
case item, ok := <-b.ch:
168225
if !ok {
169-
call(ctx, f, batch, errorHandler)
226+
flush()
170227

171228
return
172229
}
173-
batch = append(batch, batchedItem)
174-
default:
230+
175231
if len(batch) == 0 {
176-
batchedItem, ok = <-ch
177-
// Flush what's left in the buffer if the batcher is stopped
178-
if !ok {
179-
call(ctx, f, batch, errorHandler)
180-
181-
return
182-
}
183-
batch = append(batch, batchedItem)
184-
} else {
185-
if delay := maxBatchedItemDelay - time.Since(lastPushTime); delay > 0 {
186-
t := acquireTimer(delay)
187-
select {
188-
case batchedItem, ok = <-ch:
189-
if !ok {
190-
call(ctx, f, batch, errorHandler)
191-
192-
return
193-
}
194-
batch = append(batch, batchedItem)
195-
case <-t.C:
196-
}
197-
releaseTimer(t)
198-
}
232+
batchStartTime = time.Now()
199233
}
200-
}
201234

202-
if len(batch) >= maxBatchedItemBatchSize || time.Since(lastPushTime) > maxBatchedItemDelay {
203-
lastPushTime = time.Now()
204-
call(ctx, f, batch, errorHandler)
205-
batch = batch[:0]
206-
}
207-
}
208-
}
209-
210-
func call[T any](ctx context.Context, f BatcherFunc[T], batch []T, errorHandler func(error)) {
211-
if len(batch) > 0 {
212-
err := f(ctx, batch)
213-
if err != nil {
214-
errorHandler(err)
235+
batch = append(batch, item)
236+
if len(batch) >= b.MaxBatchSize {
237+
flush()
238+
}
239+
case <-ticker.C:
240+
flush()
215241
}
216242
}
217243
}

0 commit comments

Comments
 (0)