Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ processors:
- "orchestrator.*"
- "template.*"
- "api.*"
- "batcher.*"
- "db.sql.connection.*"
- "db.client.*"
- "vault.*"
Expand Down
6 changes: 3 additions & 3 deletions packages/clickhouse/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ require (
github.com/ClickHouse/clickhouse-go/v2 v2.40.1
github.com/e2b-dev/infra/packages/shared v0.0.0
github.com/google/uuid v1.6.0
go.opentelemetry.io/otel v1.41.0
go.opentelemetry.io/otel/metric v1.41.0
go.opentelemetry.io/otel/trace v1.41.0
go.uber.org/zap v1.27.1
)

Expand Down Expand Up @@ -77,17 +80,14 @@ require (
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/contrib/bridges/otelzap v0.14.0 // indirect
go.opentelemetry.io/contrib/instrumentation/runtime v0.66.0 // indirect
go.opentelemetry.io/otel v1.41.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.15.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.39.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.39.0 // indirect
go.opentelemetry.io/otel/log v0.15.0 // indirect
go.opentelemetry.io/otel/metric v1.41.0 // indirect
go.opentelemetry.io/otel/sdk v1.41.0 // indirect
go.opentelemetry.io/otel/sdk/log v0.15.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.41.0 // indirect
go.opentelemetry.io/otel/trace v1.41.0 // indirect
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
go.uber.org/mock v0.5.2 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand Down
192 changes: 109 additions & 83 deletions packages/clickhouse/pkg/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@ package batcher
import (
"context"
"errors"
"sync"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/e2b-dev/infra/packages/shared/pkg/utils"
)

const (
Expand All @@ -19,6 +26,16 @@ var (
ErrBatcherQueueFull = errors.New("batcher queue is full")
)

var (
meter = otel.Meter("github.com/e2b-dev/infra/packages/clickhouse/pkg/batcher")

mItemsDropped = utils.Must(meter.Int64Counter("batcher.items.dropped", metric.WithDescription("Number of items dropped because the batcher queue was full"), metric.WithUnit("{item}")))
mQueueLen = utils.Must(meter.Int64Gauge("batcher.queue.length", metric.WithDescription("Current number of items waiting in the batcher queue"), metric.WithUnit("{item}")))
mFlushBatchSize = utils.Must(meter.Int64Histogram("batcher.flush.batch_size", metric.WithDescription("Number of items per flushed batch"), metric.WithUnit("{item}")))
mFlushWaitDuration = utils.Must(meter.Int64Histogram("batcher.flush.wait_duration", metric.WithDescription("Time from first item enqueued in a batch to when the batch is flushed"), metric.WithUnit("ms")))
mFlushDuration = utils.Must(meter.Int64Histogram("batcher.flush.duration", metric.WithDescription("Time spent executing BatcherFunc per flush"), metric.WithUnit("ms")))
)

// Batcher groups items in batches and calls Func on them.
//
// See also BytesBatcher.
Comment thread
sitole marked this conversation as resolved.
Expand All @@ -29,20 +46,24 @@ type Batcher[T any] struct {
// Maximum batch size that will be passed to BatcherFunc.
MaxBatchSize int

// Maximum delay between Push() and BatcherFunc call.
// Maximum delay between the first item being enqueued in a batch and the
// BatcherFunc call for that batch.
MaxDelay time.Duration

// Maximum unprocessed items' queue size.
QueueSize int

// ErrorHandler is called when BatcherFunc returns an error
// If not set, errors from BatcherFunc will be silently dropped
// This allows customizing error handling behavior - e.g. logging, metrics, etc.
// ErrorHandler is called when BatcherFunc returns an error.
// If not set, errors from BatcherFunc will be silently dropped.
ErrorHandler func(error)

// Synchronization primitives.
ch chan T
doneCh chan struct{}
mu sync.RWMutex
ch chan T
doneCh chan struct{}
started bool

attrs metric.MeasurementOption
}

// BatcherFunc is called by Batcher when batch is ready to be processed.
Expand All @@ -52,21 +73,24 @@ type Batcher[T any] struct {
type BatcherFunc[T any] func(ctx context.Context, batch []T) error

type BatcherOptions struct {
// MaxBatchSize is the maximum number of items that will be collected into a single batch
// before being flushed by the BatcherFunc
// Name is added as the "batcher" attribute on all metrics, allowing different
// batcher instances to be identified in dashboards (e.g. "sandbox-events", "billing-export").
Name string

// MaxBatchSize is the maximum number of items collected into a single batch
// before being flushed by the BatcherFunc.
MaxBatchSize int

// MaxDelay is the maximum time to wait for a batch to fill up before flushing it,
// even if the batch size hasn't reached MaxBatchSize
// MaxDelay is the maximum time between the first item being enqueued in a
// batch and the BatcherFunc call for that batch.
MaxDelay time.Duration

Comment thread
sitole marked this conversation as resolved.
// QueueSize is the size of the channel buffer used to queue incoming items
// If the queue is full, new items will be rejected
// QueueSize is the size of the channel buffer used to queue incoming items.
// Items pushed when the queue is full are dropped (Push returns ErrBatcherQueueFull).
QueueSize int

// ErrorHandler is called when BatcherFunc returns an error
// If not set, errors from BatcherFunc will be silently dropped
// This allows customizing error handling behavior - e.g. logging, metrics, etc.
// ErrorHandler is called when BatcherFunc returns an error.
// If not set, errors from BatcherFunc will be silently dropped.
ErrorHandler func(error)
}

Expand All @@ -78,16 +102,16 @@ func NewBatcher[T any](fn BatcherFunc[T], cfg BatcherOptions) (*Batcher[T], erro
MaxDelay: cfg.MaxDelay,
QueueSize: cfg.QueueSize,
ErrorHandler: cfg.ErrorHandler,

attrs: metric.WithAttributeSet(attribute.NewSet(attribute.String("batcher", cfg.Name))),
}

if b.Func == nil {
return nil, ErrFuncNotSet
}

if b.ErrorHandler == nil {
b.ErrorHandler = func(error) {}
}

if b.MaxBatchSize <= 0 {
b.MaxBatchSize = defaultMaxBatchSize
}
Expand All @@ -101,117 +125,119 @@ func NewBatcher[T any](fn BatcherFunc[T], cfg BatcherOptions) (*Batcher[T], erro
return b, nil
}

// Initialize the synchronization primitives and start batch processing.
// Start begins batch processing.
func (b *Batcher[T]) Start(ctx context.Context) error {
if b.ch != nil {
b.mu.Lock()
defer b.mu.Unlock()

if b.started {
return ErrBatcherAlreadyStarted
}

b.ch = make(chan T, b.QueueSize)
b.doneCh = make(chan struct{})
b.started = true

go func() {
processBatches(ctx, b.Func, b.ch, b.MaxBatchSize, b.MaxDelay, b.ErrorHandler)
b.processBatches(ctx)
close(b.doneCh)
}()

return nil
}

// Stop stops batch processing.
// Stop stops batch processing and flushes remaining items.
func (b *Batcher[T]) Stop() error {
if b.ch == nil {
b.mu.Lock()
if !b.started {
b.mu.Unlock()

return ErrBatcherNotStarted
}
Comment thread
sitole marked this conversation as resolved.

b.started = false
close(b.ch)
b.mu.Unlock()

<-b.doneCh
Comment thread
sitole marked this conversation as resolved.
b.ch = nil
b.doneCh = nil

return nil
}

// Push pushes new batched item into the batcher.
//
// Don't forget calling Start() before pushing items into the batcher.
func (b *Batcher[T]) Push(batchedItem T) (bool, error) {
if b.ch == nil {
return false, ErrBatcherNotStarted
// Push enqueues an item for batching.
// Returns ErrBatcherQueueFull immediately if the queue is full.
func (b *Batcher[T]) Push(item T) error {
b.mu.RLock()
defer b.mu.RUnlock()

if !b.started {
return ErrBatcherNotStarted
}

select {
case b.ch <- batchedItem:
return true, nil
case b.ch <- item:
mQueueLen.Record(context.Background(), int64(len(b.ch)), b.attrs)

return nil
default:
return false, nil
mItemsDropped.Add(context.Background(), 1, b.attrs)

return ErrBatcherQueueFull
}
}

// QueueLen returns the number of pending items, which weren't passed into
// BatcherFunc yet.
//
// Maximum number of pending items is Batcher.QueueSize.
// QueueLen returns the number of items pending in the queue.
func (b *Batcher[T]) QueueLen() int {
return len(b.ch)
}

func processBatches[T any](ctx context.Context, f BatcherFunc[T], ch <-chan T, maxBatchedItemBatchSize int, maxBatchedItemDelay time.Duration, errorHandler func(error)) {
func (b *Batcher[T]) processBatches(ctx context.Context) {
var (
batch []T
batchedItem T
ok bool
lastPushTime = time.Now()
batch []T
batchStartTime time.Time
)

ticker := time.NewTicker(b.MaxDelay)
defer ticker.Stop()

flush := func() {
if len(batch) == 0 {
return
}

mFlushWaitDuration.Record(ctx, time.Since(batchStartTime).Milliseconds(), b.attrs)
start := time.Now()
if err := b.Func(ctx, batch); err != nil {
b.ErrorHandler(err)
}

mFlushBatchSize.Record(ctx, int64(len(batch)), b.attrs)
mFlushDuration.Record(ctx, time.Since(start).Milliseconds(), b.attrs)
mQueueLen.Record(ctx, int64(len(b.ch)), b.attrs)

batch = batch[:0]
Comment thread
sitole marked this conversation as resolved.
ticker.Reset(b.MaxDelay)
}

for {
select {
case batchedItem, ok = <-ch:
case item, ok := <-b.ch:
if !ok {
call(ctx, f, batch, errorHandler)
flush()

return
}
batch = append(batch, batchedItem)
default:

if len(batch) == 0 {
batchedItem, ok = <-ch
// Flush what's left in the buffer if the batcher is stopped
if !ok {
call(ctx, f, batch, errorHandler)

return
}
batch = append(batch, batchedItem)
} else {
if delay := maxBatchedItemDelay - time.Since(lastPushTime); delay > 0 {
t := acquireTimer(delay)
select {
case batchedItem, ok = <-ch:
if !ok {
call(ctx, f, batch, errorHandler)

return
}
batch = append(batch, batchedItem)
case <-t.C:
}
releaseTimer(t)
}
batchStartTime = time.Now()
}
}

if len(batch) >= maxBatchedItemBatchSize || time.Since(lastPushTime) > maxBatchedItemDelay {
lastPushTime = time.Now()
call(ctx, f, batch, errorHandler)
batch = batch[:0]
}
}
}

func call[T any](ctx context.Context, f BatcherFunc[T], batch []T, errorHandler func(error)) {
if len(batch) > 0 {
err := f(ctx, batch)
if err != nil {
errorHandler(err)
batch = append(batch, item)
if len(batch) >= b.MaxBatchSize {
flush()
}
case <-ticker.C:
flush()
}
}
}
Loading
Loading