diff --git a/go.mod b/go.mod index 12976c7d1f..4d06d704b1 100644 --- a/go.mod +++ b/go.mod @@ -160,3 +160,5 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) + +replace github.com/smartcontractkit/chainlink-common/pkg/chipingress => ./pkg/chipingress diff --git a/pkg/beholder/chip_ingress_batch_emitter_service.go b/pkg/beholder/chip_ingress_batch_emitter_service.go new file mode 100644 index 0000000000..e306ebd1d4 --- /dev/null +++ b/pkg/beholder/chip_ingress_batch_emitter_service.go @@ -0,0 +1,204 @@ +package beholder + +import ( + "context" + "fmt" + "sync" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + otelmetric "go.opentelemetry.io/otel/metric" + + "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + "github.com/smartcontractkit/chainlink-common/pkg/chipingress/batch" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" +) + +// ChipIngressBatchEmitterService batches events and sends them via chipingress.Client.PublishBatch. +// It implements the Emitter interface. +type ChipIngressBatchEmitterService struct { + services.Service + eng *services.Engine + + batchClient *batch.Client + + metricAttrsCache sync.Map // map[string]otelmetric.MeasurementOption + metrics batchEmitterMetrics +} + +type batchEmitterMetrics struct { + eventsSent otelmetric.Int64Counter + eventsDropped otelmetric.Int64Counter +} + +// NewChipIngressBatchEmitterService creates a batch emitter service backed by the given chipingress client. +func NewChipIngressBatchEmitterService(client chipingress.Client, cfg Config, lggr logger.Logger) (*ChipIngressBatchEmitterService, error) { + if client == nil { + return nil, fmt.Errorf("chip ingress client is nil") + } + + bufferSize := int(cfg.ChipIngressBufferSize) + if bufferSize == 0 { + bufferSize = 1000 + } + maxBatchSize := int(cfg.ChipIngressMaxBatchSize) + if maxBatchSize == 0 { + maxBatchSize = 500 + } + maxConcurrentSends := cfg.ChipIngressMaxConcurrentSends + if maxConcurrentSends == 0 { + maxConcurrentSends = defaultMaxConcurrentSends + } + sendInterval := cfg.ChipIngressSendInterval + if sendInterval == 0 { + sendInterval = 100 * time.Millisecond + } + sendTimeout := cfg.ChipIngressSendTimeout + if sendTimeout == 0 { + sendTimeout = 3 * time.Second + } + drainTimeout := cfg.ChipIngressDrainTimeout + if drainTimeout == 0 { + drainTimeout = 10 * time.Second + } + + meter := otel.Meter("beholder/chip_ingress_batch_emitter") + metrics, err := newBatchEmitterMetrics(meter) + if err != nil { + return nil, fmt.Errorf("failed to create batch emitter metrics: %w", err) + } + + batchClient, err := batch.NewBatchClient(client, + batch.WithBatchSize(maxBatchSize), + batch.WithMessageBuffer(bufferSize), + batch.WithBatchInterval(sendInterval), + batch.WithMaxPublishTimeout(sendTimeout), + batch.WithShutdownTimeout(drainTimeout), + batch.WithMaxConcurrentSends(maxConcurrentSends), + batch.WithEventClone(false), + ) + if err != nil { + return nil, fmt.Errorf("failed to create batch client: %w", err) + } + + e := &ChipIngressBatchEmitterService{ + batchClient: batchClient, + metrics: metrics, + } + + e.Service, e.eng = services.Config{ + Name: "ChipIngressBatchEmitterService", + Start: e.start, + Close: e.stop, + }.NewServiceEngine(lggr) + + return e, nil +} + +func (e *ChipIngressBatchEmitterService) start(ctx context.Context) error { + e.batchClient.Start(ctx) + return nil +} + +func (e *ChipIngressBatchEmitterService) stop() error { + e.batchClient.Stop() + return nil +} + +// Emit queues an event for batched delivery without blocking. +// Returns an error if the emitter is stopped or the context is cancelled. +// If the buffer is full, the event is silently dropped. +func (e *ChipIngressBatchEmitterService) Emit(ctx context.Context, body []byte, attrKVs ...any) error { + return e.emitInternal(ctx, body, nil, attrKVs...) +} + +// EmitWithCallback works like Emit but invokes callback once the event's fate +// is determined (nil on success, non-nil on failure or buffer-full drop). +// +// If EmitWithCallback returns a non-nil error, the callback will NOT be invoked. +// If it returns nil, the callback is guaranteed to fire exactly once. +func (e *ChipIngressBatchEmitterService) EmitWithCallback(ctx context.Context, body []byte, callback func(error), attrKVs ...any) error { + return e.emitInternal(ctx, body, callback, attrKVs...) +} + +func (e *ChipIngressBatchEmitterService) emitInternal(ctx context.Context, body []byte, callback func(error), attrKVs ...any) error { + return e.eng.IfStarted(func() error { + domain, entity, err := ExtractSourceAndType(attrKVs...) + if err != nil { + return err + } + + attributes := newAttributes(attrKVs...) + + event, err := chipingress.NewEvent(domain, entity, body, attributes) + if err != nil { + return fmt.Errorf("failed to create CloudEvent: %w", err) + } + eventPb, err := chipingress.EventToProto(event) + if err != nil { + return fmt.Errorf("failed to convert to proto: %w", err) + } + + if err := ctx.Err(); err != nil { + return err + } + + metricAttrs := e.metricAttrsFor(domain, entity) + metricsCtx := context.Background() + + queueErr := e.batchClient.QueueMessage(eventPb, func(sendErr error) { + if sendErr != nil { + e.metrics.eventsDropped.Add(metricsCtx, 1, metricAttrs) + } else { + e.metrics.eventsSent.Add(metricsCtx, 1, metricAttrs) + } + if callback != nil { + callback(sendErr) + } + }) + if queueErr != nil { + e.metrics.eventsDropped.Add(metricsCtx, 1, metricAttrs) + if callback != nil { + callback(queueErr) + } + } + + return nil + }) +} + +func (e *ChipIngressBatchEmitterService) metricAttrsFor(domain, entity string) otelmetric.MeasurementOption { + key := domain + "\x00" + entity + if v, ok := e.metricAttrsCache.Load(key); ok { + return v.(otelmetric.MeasurementOption) + } + attrs := otelmetric.WithAttributeSet(attribute.NewSet( + attribute.String("domain", domain), + attribute.String("entity", entity), + )) + v, _ := e.metricAttrsCache.LoadOrStore(key, attrs) + return v.(otelmetric.MeasurementOption) +} + +func newBatchEmitterMetrics(meter otelmetric.Meter) (batchEmitterMetrics, error) { + eventsSent, err := meter.Int64Counter("chip_ingress.events_sent", + otelmetric.WithDescription("Total events successfully sent via PublishBatch"), + otelmetric.WithUnit("{event}")) + if err != nil { + return batchEmitterMetrics{}, err + } + + eventsDropped, err := meter.Int64Counter("chip_ingress.events_dropped", + otelmetric.WithDescription("Total events dropped (buffer full or send failure)"), + otelmetric.WithUnit("{event}")) + if err != nil { + return batchEmitterMetrics{}, err + } + + return batchEmitterMetrics{ + eventsSent: eventsSent, + eventsDropped: eventsDropped, + }, nil +} diff --git a/pkg/beholder/chip_ingress_batch_emitter_service_test.go b/pkg/beholder/chip_ingress_batch_emitter_service_test.go new file mode 100644 index 0000000000..6bf44cb961 --- /dev/null +++ b/pkg/beholder/chip_ingress_batch_emitter_service_test.go @@ -0,0 +1,597 @@ +package beholder_test + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + "github.com/smartcontractkit/chainlink-common/pkg/chipingress/mocks" + "github.com/smartcontractkit/chainlink-common/pkg/logger" +) + +func newTestConfig() beholder.Config { + return beholder.Config{ + ChipIngressBufferSize: 10, + ChipIngressMaxBatchSize: 5, + ChipIngressMaxConcurrentSends: 3, + ChipIngressSendInterval: 50 * time.Millisecond, + ChipIngressSendTimeout: 5 * time.Second, + ChipIngressDrainTimeout: 5 * time.Second, + } +} + +func newTestLogger(t *testing.T) logger.Logger { + t.Helper() + lggr, err := logger.New() + require.NoError(t, err) + t.Cleanup(func() { _ = lggr.Sync() }) + return lggr +} + +func TestNewChipIngressBatchEmitterService(t *testing.T) { + t.Run("happy path", func(t *testing.T) { + clientMock := mocks.NewClient(t) + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, newTestConfig(), newTestLogger(t)) + require.NoError(t, err) + assert.NotNil(t, emitter) + }) + + t.Run("returns error when client is nil", func(t *testing.T) { + emitter, err := beholder.NewChipIngressBatchEmitterService(nil, newTestConfig(), newTestLogger(t)) + assert.Error(t, err) + assert.Nil(t, emitter) + }) +} + +func TestChipIngressBatchEmitterService_Emit(t *testing.T) { + t.Run("returns error when domain/entity missing", func(t *testing.T) { + clientMock := mocks.NewClient(t) + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, newTestConfig(), newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + defer emitter.Close() //nolint:errcheck + + err = emitter.Emit(t.Context(), []byte("test"), "bad_key", "bad_value") + assert.Error(t, err) + }) + + t.Run("events are batched and sent via PublishBatch", func(t *testing.T) { + clientMock := mocks.NewClient(t) + + var mu sync.Mutex + var receivedBatches []*chipingress.CloudEventBatch + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + mu.Lock() + defer mu.Unlock() + batch := args.Get(1).(*chipingress.CloudEventBatch) + receivedBatches = append(receivedBatches, batch) + }). + Return(nil, nil) + + cfg := newTestConfig() + cfg.ChipIngressSendInterval = 50 * time.Millisecond + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + + for i := 0; i < 3; i++ { + err = emitter.Emit(t.Context(), []byte("body"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + require.NoError(t, err) + } + + assert.Eventually(t, func() bool { + mu.Lock() + defer mu.Unlock() + return len(receivedBatches) > 0 + }, 2*time.Second, 10*time.Millisecond) + + require.NoError(t, emitter.Close()) + + mu.Lock() + defer mu.Unlock() + + totalEvents := 0 + for _, batch := range receivedBatches { + totalEvents += len(batch.Events) + } + assert.Equal(t, 3, totalEvents) + }) +} + +func TestChipIngressBatchEmitterService_CloudEventFormat(t *testing.T) { + clientMock := mocks.NewClient(t) + + var mu sync.Mutex + var receivedBatch *chipingress.CloudEventBatch + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + mu.Lock() + defer mu.Unlock() + receivedBatch = args.Get(1).(*chipingress.CloudEventBatch) + }). + Return(nil, nil) + + cfg := newTestConfig() + cfg.ChipIngressSendInterval = 50 * time.Millisecond + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + + err = emitter.Emit(t.Context(), []byte("test-payload"), + beholder.AttrKeyDomain, "my-domain", + beholder.AttrKeyEntity, "my-entity", + ) + require.NoError(t, err) + + assert.Eventually(t, func() bool { + mu.Lock() + defer mu.Unlock() + return receivedBatch != nil + }, 2*time.Second, 10*time.Millisecond) + + require.NoError(t, emitter.Close()) + + mu.Lock() + defer mu.Unlock() + require.Len(t, receivedBatch.Events, 1) + + event := receivedBatch.Events[0] + assert.Equal(t, "my-domain", event.Source) + assert.Equal(t, "my-entity", event.Type) + assert.NotEmpty(t, event.Id) +} + +func TestChipIngressBatchEmitterService_PublishBatchError(t *testing.T) { + clientMock := mocks.NewClient(t) + + var mu sync.Mutex + callCount := 0 + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Run(func(_ mock.Arguments) { + mu.Lock() + defer mu.Unlock() + callCount++ + }). + Return(nil, assert.AnError) + + cfg := newTestConfig() + cfg.ChipIngressSendInterval = 50 * time.Millisecond + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + + for i := 0; i < 3; i++ { + err = emitter.Emit(t.Context(), []byte("body"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + require.NoError(t, err) + } + + assert.Eventually(t, func() bool { + mu.Lock() + defer mu.Unlock() + return callCount > 0 + }, 2*time.Second, 10*time.Millisecond) + + require.NoError(t, emitter.Close()) +} + +func TestChipIngressBatchEmitterService_ContextCancellation(t *testing.T) { + clientMock := mocks.NewClient(t) + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Return(nil, nil). + Maybe() + + cfg := newTestConfig() + cfg.ChipIngressBufferSize = 1 + cfg.ChipIngressSendInterval = 10 * time.Second + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + defer emitter.Close() //nolint:errcheck + + ctx, cancel := context.WithCancel(t.Context()) + cancel() + + err = emitter.Emit(ctx, []byte("should-fail"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + assert.ErrorIs(t, err, context.Canceled) +} + +func TestChipIngressBatchEmitterService_DefaultConfig(t *testing.T) { + clientMock := mocks.NewClient(t) + + var mu sync.Mutex + var receivedBatch *chipingress.CloudEventBatch + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + mu.Lock() + defer mu.Unlock() + receivedBatch = args.Get(1).(*chipingress.CloudEventBatch) + }). + Return(nil, nil) + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, beholder.Config{}, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + + err = emitter.Emit(t.Context(), []byte("body"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + require.NoError(t, err) + + assert.Eventually(t, func() bool { + mu.Lock() + defer mu.Unlock() + return receivedBatch != nil + }, 3*time.Second, 50*time.Millisecond) + + require.NoError(t, emitter.Close()) + + mu.Lock() + defer mu.Unlock() + require.Len(t, receivedBatch.Events, 1) +} + +func TestChipIngressBatchEmitterService_EmitAfterClose(t *testing.T) { + clientMock := mocks.NewClient(t) + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Return(nil, nil). + Maybe() + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, newTestConfig(), newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + require.NoError(t, emitter.Close()) + + err = emitter.Emit(t.Context(), []byte("body"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + assert.Error(t, err) +} + +func TestChipIngressBatchEmitterService_EmitWithCallback(t *testing.T) { + t.Run("callback receives nil on success", func(t *testing.T) { + clientMock := mocks.NewClient(t) + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Return(nil, nil) + + cfg := newTestConfig() + cfg.ChipIngressSendInterval = 50 * time.Millisecond + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + + done := make(chan error, 1) + err = emitter.EmitWithCallback(t.Context(), []byte("body"), func(sendErr error) { + done <- sendErr + }, + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + require.NoError(t, err) + + select { + case sendErr := <-done: + assert.NoError(t, sendErr) + case <-time.After(3 * time.Second): + t.Fatal("callback was not invoked within timeout") + } + + require.NoError(t, emitter.Close()) + }) + + t.Run("callback receives error on PublishBatch failure", func(t *testing.T) { + clientMock := mocks.NewClient(t) + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Return(nil, assert.AnError) + + cfg := newTestConfig() + cfg.ChipIngressSendInterval = 50 * time.Millisecond + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + + done := make(chan error, 1) + err = emitter.EmitWithCallback(t.Context(), []byte("body"), func(sendErr error) { + done <- sendErr + }, + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + require.NoError(t, err) + + select { + case sendErr := <-done: + assert.Error(t, sendErr) + case <-time.After(3 * time.Second): + t.Fatal("callback was not invoked within timeout") + } + + require.NoError(t, emitter.Close()) + }) + + t.Run("callback receives error when buffer is full", func(t *testing.T) { + clientMock := mocks.NewClient(t) + + sendBlocked := make(chan struct{}) + firstCallSignal := make(chan struct{}, 1) + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Run(func(_ mock.Arguments) { + select { + case firstCallSignal <- struct{}{}: + default: + } + <-sendBlocked + }). + Return(nil, nil). + Maybe() + + cfg := newTestConfig() + cfg.ChipIngressBufferSize = 2 + cfg.ChipIngressMaxBatchSize = 1 + cfg.ChipIngressMaxConcurrentSends = 1 + cfg.ChipIngressSendInterval = 50 * time.Millisecond + cfg.ChipIngressDrainTimeout = 200 * time.Millisecond + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + defer close(sendBlocked) + defer emitter.Close() //nolint:errcheck + + err = emitter.Emit(t.Context(), []byte("body"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + require.NoError(t, err) + + <-firstCallSignal + time.Sleep(100 * time.Millisecond) + + for i := 0; i < 10; i++ { + _ = emitter.Emit(t.Context(), []byte("filler"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + } + + dropped := make(chan error, 1) + err = emitter.EmitWithCallback(t.Context(), []byte("overflow"), func(sendErr error) { + dropped <- sendErr + }, + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + assert.NoError(t, err) + + select { + case dropErr := <-dropped: + assert.Error(t, dropErr) + case <-time.After(time.Second): + t.Fatal("callback was not invoked for dropped event") + } + }) + + t.Run("nil callback behaves like Emit", func(t *testing.T) { + clientMock := mocks.NewClient(t) + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Return(nil, nil). + Maybe() + + cfg := newTestConfig() + cfg.ChipIngressSendInterval = 50 * time.Millisecond + + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + + err = emitter.EmitWithCallback(t.Context(), []byte("body"), nil, + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + assert.NoError(t, err) + + require.NoError(t, emitter.Close()) + }) +} + +func TestChipIngressBatchEmitterService_Metrics(t *testing.T) { + t.Run("records events_sent on successful publish", func(t *testing.T) { + reader, restore := useEmitterTestMeterProvider(t) + defer restore() + + clientMock := mocks.NewClient(t) + done := make(chan struct{}) + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Return(nil, nil). + Run(func(_ mock.Arguments) { close(done) }). + Once() + + cfg := newTestConfig() + cfg.ChipIngressMaxBatchSize = 1 + cfg.ChipIngressSendInterval = time.Second + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + + err = emitter.Emit(t.Context(), []byte("body"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "MetricEvent", + ) + require.NoError(t, err) + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for publish") + } + require.NoError(t, emitter.Close()) + + rm := collectEmitterMetrics(t, reader) + metric := mustEmitterMetric(t, rm, "chip_ingress.events_sent") + sum, ok := metric.Data.(metricdata.Sum[int64]) + require.True(t, ok) + dp := mustEmitterInt64SumPoint(t, sum, "domain", "platform", "entity", "MetricEvent") + assert.GreaterOrEqual(t, dp.Value, int64(1)) + }) + + t.Run("records events_dropped on publish error", func(t *testing.T) { + reader, restore := useEmitterTestMeterProvider(t) + defer restore() + + clientMock := mocks.NewClient(t) + done := make(chan struct{}) + clientMock. + On("PublishBatch", mock.Anything, mock.Anything). + Return(nil, assert.AnError). + Run(func(_ mock.Arguments) { close(done) }). + Once() + + cfg := newTestConfig() + cfg.ChipIngressMaxBatchSize = 1 + cfg.ChipIngressSendInterval = time.Second + emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t)) + require.NoError(t, err) + require.NoError(t, emitter.Start(t.Context())) + + err = emitter.Emit(t.Context(), []byte("body"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "MetricDropEvent", + ) + require.NoError(t, err) + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for publish") + } + require.NoError(t, emitter.Close()) + + rm := collectEmitterMetrics(t, reader) + metric := mustEmitterMetric(t, rm, "chip_ingress.events_dropped") + sum, ok := metric.Data.(metricdata.Sum[int64]) + require.True(t, ok) + dp := mustEmitterInt64SumPoint(t, sum, "domain", "platform", "entity", "MetricDropEvent") + assert.GreaterOrEqual(t, dp.Value, int64(1)) + }) +} + +func BenchmarkChipIngressBatchEmitterService_Emit(b *testing.B) { + cfg := beholder.Config{ + ChipIngressBufferSize: uint(b.N + 10), + ChipIngressMaxBatchSize: uint(b.N + 1), + ChipIngressMaxConcurrentSends: 1, + ChipIngressSendInterval: time.Hour, + ChipIngressSendTimeout: 5 * time.Second, + ChipIngressDrainTimeout: 5 * time.Second, + } + emitter, err := beholder.NewChipIngressBatchEmitterService(&chipingress.NoopClient{}, cfg, logger.Nop()) + if err != nil { + b.Fatal(err) + } + if err := emitter.Start(context.Background()); err != nil { + b.Fatal(err) + } + defer func() { _ = emitter.Close() }() + + payload := []byte("benchmark-payload") + b.ResetTimer() + for i := 0; i < b.N; i++ { + if err := emitter.Emit(context.Background(), payload, + beholder.AttrKeyDomain, "bench", + beholder.AttrKeyEntity, "BenchmarkEvent", + ); err != nil { + b.Fatal(err) + } + } +} + +func useEmitterTestMeterProvider(t *testing.T) (*sdkmetric.ManualReader, func()) { + t.Helper() + prev := otel.GetMeterProvider() + reader := sdkmetric.NewManualReader() + provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) + otel.SetMeterProvider(provider) + return reader, func() { + require.NoError(t, provider.Shutdown(t.Context())) + otel.SetMeterProvider(prev) + } +} + +func collectEmitterMetrics(t *testing.T, reader *sdkmetric.ManualReader) metricdata.ResourceMetrics { + t.Helper() + var rm metricdata.ResourceMetrics + require.NoError(t, reader.Collect(t.Context(), &rm)) + return rm +} + +func mustEmitterMetric(t *testing.T, rm metricdata.ResourceMetrics, name string) metricdata.Metrics { + t.Helper() + for _, sm := range rm.ScopeMetrics { + for _, metric := range sm.Metrics { + if metric.Name == name { + return metric + } + } + } + t.Fatalf("metric %q not found", name) + return metricdata.Metrics{} +} + +func mustEmitterInt64SumPoint(t *testing.T, sum metricdata.Sum[int64], k1, v1, k2, v2 string) metricdata.DataPoint[int64] { + t.Helper() + for _, dp := range sum.DataPoints { + if hasEmitterStringAttr(dp.Attributes, k1, v1) && hasEmitterStringAttr(dp.Attributes, k2, v2) { + return dp + } + } + t.Fatalf("sum datapoint not found for attrs %s=%s,%s=%s", k1, v1, k2, v2) + return metricdata.DataPoint[int64]{} +} + +func hasEmitterStringAttr(set attribute.Set, key, want string) bool { + for _, kv := range set.ToSlice() { + if string(kv.Key) == key { + return kv.Value.AsString() == want + } + } + return false +} diff --git a/pkg/beholder/client.go b/pkg/beholder/client.go index 328d877c49..8c0e1187bc 100644 --- a/pkg/beholder/client.go +++ b/pkg/beholder/client.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "sync" "time" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" @@ -26,6 +27,8 @@ import ( "google.golang.org/grpc/credentials/insecure" "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + pkglogger "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" ) const defaultGRPCCompressor = "gzip" @@ -37,6 +40,9 @@ type Emitter interface { } type Client struct { + services.Service + eng *services.Engine + Config Config // Logger Logger otellog.Logger @@ -60,6 +66,12 @@ type Client struct { // OnClose OnClose func() error + + // batchEmitterService is owned by the client and started/stopped via Client lifecycle. + batchEmitterService *ChipIngressBatchEmitterService + + closeOnce sync.Once + closeErr error } // NewClient creates a new Client with initialized OpenTelemetry components @@ -187,7 +199,7 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro // This will eventually be removed in favor of chip-ingress emitter // and logs will be sent via OTLP using the regular Logger instead of calling Emit emitter := NewMessageEmitter(messageLogger) - + var batchEmitterService *ChipIngressBatchEmitterService var chipIngressClient chipingress.Client = &chipingress.NoopClient{} // if chip ingress is enabled, create dual source emitter that sends to both otel collector and chip ingress // eventually we will remove the dual source emitter and just use chip ingress @@ -223,38 +235,98 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro return nil, err } - chipIngressEmitter, err := NewChipIngressEmitter(chipIngressClient) - if err != nil { - return nil, fmt.Errorf("failed to create chip ingress emitter: %w", err) + var chipIngressEmitter Emitter + if cfg.ChipIngressBatchEmitterEnabled { + if cfg.ChipIngressLogger == nil { + return nil, fmt.Errorf("ChipIngressLogger is required when ChipIngressBatchEmitterEnabled is true") + } + batchEmitterService, err = NewChipIngressBatchEmitterService(chipIngressClient, cfg, cfg.ChipIngressLogger) + if err != nil { + return nil, fmt.Errorf("failed to create chip ingress batch emitter: %w", err) + } + chipIngressEmitter = batchEmitterService + } else { + chipIngressEmitter, err = NewChipIngressEmitter(chipIngressClient) + if err != nil { + return nil, fmt.Errorf("failed to create chip ingress emitter: %w", err) + } } - emitter, err = NewDualSourceEmitter(chipIngressEmitter, emitter) + emitter, err = NewDualSourceEmitter(chipIngressEmitter, emitter, cfg.ChipIngressBatchEmitterEnabled) if err != nil { return nil, fmt.Errorf("failed to create dual source emitter: %w", err) } } onClose := func() (err error) { - for _, provider := range []shutdowner{messageLoggerProvider, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider} { + for _, provider := range []shutdowner{messageLoggerProvider, loggerProvider, tracerProvider, meterProvider} { err = errors.Join(err, provider.Shutdown(context.Background())) } return } - return &Client{cfg, logger, tracer, meter, emitter, chipIngressClient, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, signer, onClose}, nil + c := &Client{ + Config: cfg, + Logger: logger, + Tracer: tracer, + Meter: meter, + Emitter: emitter, + Chip: chipIngressClient, + LoggerProvider: loggerProvider, + TracerProvider: tracerProvider, + MeterProvider: meterProvider, + MessageLoggerProvider: messageLoggerProvider, + lazySigner: signer, + OnClose: onClose, + batchEmitterService: batchEmitterService, + } + c.Service, c.eng = services.Config{ + Name: "BeholderClient", + Start: c.start, + Close: c.closeResources, + }.NewServiceEngine(pkglogger.Nop()) + return c, nil } -// Closes all providers, flushes all data and stops all background processes -func (c Client) Close() (err error) { - if c.Chip != nil { - err = errors.Join(err, c.Chip.Close()) +// ManagedServices returns services whose lifecycle is owned by the application +// (start, stop, health). Returns nil when the receiver is nil or has none. +func (c *Client) ManagedServices() []services.Service { + return nil +} + +// Close shuts down OTel providers and the chip ingress connection. +func (c *Client) Close() (err error) { + if c == nil { + return nil } - if c.Emitter != nil { - err = errors.Join(err, c.Emitter.Close()) + if c.Service != nil && c.eng != nil { + if err = c.eng.IfStarted(func() error { return nil }); err != nil { + return c.closeResources() + } + return c.Service.Close() } - if c.OnClose != nil { - err = errors.Join(err, c.OnClose()) + return c.closeResources() +} + +func (c *Client) start(ctx context.Context) error { + if c.batchEmitterService != nil { + return c.batchEmitterService.Start(ctx) } - return + return nil +} + +func (c *Client) closeResources() (err error) { + c.closeOnce.Do(func() { + if c.Emitter != nil { + c.closeErr = errors.Join(c.closeErr, c.Emitter.Close()) + } + if c.OnClose != nil { + c.closeErr = errors.Join(c.closeErr, c.OnClose()) + } + if c.Chip != nil { + c.closeErr = errors.Join(c.closeErr, c.Chip.Close()) + } + }) + return c.closeErr } // Returns a new Client with the same configuration but with a different package name diff --git a/pkg/beholder/config.go b/pkg/beholder/config.go index f3e4561b68..deeed041fd 100644 --- a/pkg/beholder/config.go +++ b/pkg/beholder/config.go @@ -7,6 +7,8 @@ import ( "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/trace" "go.uber.org/zap/zapcore" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" ) type Config struct { @@ -44,6 +46,16 @@ type Config struct { ChipIngressEmitterGRPCEndpoint string ChipIngressInsecureConnection bool // Disables TLS for Chip Ingress Emitter + // Chip Ingress Batch Emitter + ChipIngressBatchEmitterEnabled bool // When true, use batch emitter; when false (default), use legacy per-event emitter + ChipIngressBufferSize uint // Message buffer size (default 1000) + ChipIngressMaxBatchSize uint // Max events per PublishBatch call (default 500) + ChipIngressSendInterval time.Duration // Flush interval (default 100ms) + ChipIngressSendTimeout time.Duration // Timeout per PublishBatch call (default 3s) + ChipIngressDrainTimeout time.Duration // Max time to flush remaining events on shutdown (default 10s) + ChipIngressMaxConcurrentSends int // Max concurrent PublishBatch calls (default 10) + ChipIngressLogger logger.Logger // Required when ChipIngressBatchEmitterEnabled is true + // OTel Log LogExportTimeout time.Duration LogExportInterval time.Duration @@ -91,7 +103,8 @@ var defaultRetryConfig = RetryConfig{ } const ( - defaultPackageName = "beholder" + defaultPackageName = "beholder" + defaultMaxConcurrentSends = 10 ) var defaultOtelAttributes = []attribute.KeyValue{ @@ -131,8 +144,16 @@ func DefaultConfig() Config { LogMaxQueueSize: 2048, LogBatchProcessor: true, LogStreamingEnabled: true, // Enable logs streaming by default - LogLevel: zapcore.InfoLevel, - LogCompressor: "gzip", + LogLevel: zapcore.InfoLevel, + LogCompressor: "gzip", + // Chip Ingress Batch Emitter + ChipIngressBatchEmitterEnabled: false, + ChipIngressBufferSize: 1000, + ChipIngressMaxBatchSize: 500, + ChipIngressSendInterval: 100 * time.Millisecond, + ChipIngressSendTimeout: 3 * time.Second, + ChipIngressDrainTimeout: 10 * time.Second, + ChipIngressMaxConcurrentSends: defaultMaxConcurrentSends, // Auth (defaults to static auth mode with TTL=0) AuthHeadersTTL: 0, } diff --git a/pkg/beholder/config_test.go b/pkg/beholder/config_test.go index ae156db9c9..80bef78d3a 100644 --- a/pkg/beholder/config_test.go +++ b/pkg/beholder/config_test.go @@ -67,6 +67,6 @@ func ExampleConfig() { } fmt.Printf("%+v\n", *config.LogRetryConfig) // Output: - // {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter: TraceRetryConfig: TraceCompressor:gzip MetricReaderInterval:1s MetricRetryConfig: MetricViews:[] MetricCompressor:gzip ChipIngressEmitterEnabled:false ChipIngressEmitterGRPCEndpoint: ChipIngressInsecureConnection:false LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig: LogStreamingEnabled:false LogLevel:info LogCompressor:gzip AuthHeaders:map[] AuthHeadersTTL:0s AuthKeySigner: AuthPublicKeyHex:} + // {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter: TraceRetryConfig: TraceCompressor:gzip MetricReaderInterval:1s MetricRetryConfig: MetricViews:[] MetricCompressor:gzip ChipIngressEmitterEnabled:false ChipIngressEmitterGRPCEndpoint: ChipIngressInsecureConnection:false ChipIngressBatchEmitterEnabled:false ChipIngressBufferSize:0 ChipIngressMaxBatchSize:0 ChipIngressSendInterval:0s ChipIngressSendTimeout:0s ChipIngressDrainTimeout:0s ChipIngressMaxConcurrentSends:0 ChipIngressLogger: LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig: LogStreamingEnabled:false LogLevel:info LogCompressor:gzip AuthHeaders:map[] AuthHeadersTTL:0s AuthKeySigner: AuthPublicKeyHex:} // {InitialInterval:5s MaxInterval:30s MaxElapsedTime:1m0s} } diff --git a/pkg/beholder/dual_source_emitter.go b/pkg/beholder/dual_source_emitter.go index 5167efaece..5ce1c2b189 100644 --- a/pkg/beholder/dual_source_emitter.go +++ b/pkg/beholder/dual_source_emitter.go @@ -15,15 +15,16 @@ import ( // we want to send to both during the transition period, then cutover to using // chipIngressEmitter only type DualSourceEmitter struct { - chipIngressEmitter Emitter - otelCollectorEmitter Emitter - log logger.Logger - stopCh services.StopChan - wg services.WaitGroup - closed atomic.Bool + chipIngressEmitter Emitter + otelCollectorEmitter Emitter + log logger.Logger + chipIngressBatchEmitterEnabled bool + stopCh services.StopChan + wg services.WaitGroup + closed atomic.Bool } -func NewDualSourceEmitter(chipIngressEmitter Emitter, otelCollectorEmitter Emitter) (Emitter, error) { +func NewDualSourceEmitter(chipIngressEmitter Emitter, otelCollectorEmitter Emitter, chipIngressBatchEmitterEnabled bool) (Emitter, error) { if chipIngressEmitter == nil { return nil, fmt.Errorf("chip ingress emitter is nil") @@ -39,10 +40,11 @@ func NewDualSourceEmitter(chipIngressEmitter Emitter, otelCollectorEmitter Emitt } return &DualSourceEmitter{ - chipIngressEmitter: chipIngressEmitter, - otelCollectorEmitter: otelCollectorEmitter, - log: logger, - stopCh: make(services.StopChan), + chipIngressEmitter: chipIngressEmitter, + otelCollectorEmitter: otelCollectorEmitter, + log: logger, + chipIngressBatchEmitterEnabled: chipIngressBatchEmitterEnabled, + stopCh: make(services.StopChan), }, nil } @@ -56,28 +58,31 @@ func (d *DualSourceEmitter) Close() error { } func (d *DualSourceEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error { - - // Emit via OTLP first if err := d.otelCollectorEmitter.Emit(ctx, body, attrKVs...); err != nil { return err } - // Emit via chip ingress async - if err := d.wg.TryAdd(1); err != nil { - return err - } - go func(ctx context.Context) { - defer d.wg.Done() - var cancel context.CancelFunc - ctx, cancel = d.stopCh.Ctx(ctx) - defer cancel() - + if d.chipIngressBatchEmitterEnabled { if err := d.chipIngressEmitter.Emit(ctx, body, attrKVs...); err != nil { - // If the chip ingress emitter fails, we ONLY log the error - // because we still want to send the data to the OTLP collector and not cause disruption d.log.Infof("failed to emit to chip ingress: %v", err) } - }(context.WithoutCancel(ctx)) + } else { + // Legacy ChipIngressEmitter.Emit is a synchronous gRPC call; + // fire-and-forget via goroutine to avoid blocking the caller. + if err := d.wg.TryAdd(1); err != nil { + return err + } + go func(ctx context.Context) { + defer d.wg.Done() + var cancel context.CancelFunc + ctx, cancel = d.stopCh.Ctx(ctx) + defer cancel() + + if err := d.chipIngressEmitter.Emit(ctx, body, attrKVs...); err != nil { + d.log.Infof("failed to emit to chip ingress: %v", err) + } + }(context.WithoutCancel(ctx)) + } return nil } diff --git a/pkg/beholder/dual_source_emitter_test.go b/pkg/beholder/dual_source_emitter_test.go index 3429ab1648..314d82348f 100644 --- a/pkg/beholder/dual_source_emitter_test.go +++ b/pkg/beholder/dual_source_emitter_test.go @@ -3,7 +3,9 @@ package beholder_test import ( "context" "fmt" + "sync/atomic" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -18,7 +20,7 @@ func TestNewDualSourceEmitter(t *testing.T) { chipEmitter := &mockEmitter{} otelEmitter := &mockEmitter{} - emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter) + emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter, false) require.NoError(t, err) assert.NotNil(t, emitter) @@ -29,7 +31,7 @@ func TestNewDualSourceEmitter(t *testing.T) { t.Run("nil chip ingress emitter", func(t *testing.T) { otelEmitter := &mockEmitter{} - emitter, err := beholder.NewDualSourceEmitter(nil, otelEmitter) + emitter, err := beholder.NewDualSourceEmitter(nil, otelEmitter, false) assert.Error(t, err) assert.Nil(t, emitter) @@ -39,7 +41,7 @@ func TestNewDualSourceEmitter(t *testing.T) { t.Run("nil otel collector emitter", func(t *testing.T) { chipEmitter := &mockEmitter{} - emitter, err := beholder.NewDualSourceEmitter(chipEmitter, nil) + emitter, err := beholder.NewDualSourceEmitter(chipEmitter, nil, false) assert.Error(t, err) assert.Nil(t, emitter) @@ -51,7 +53,7 @@ func TestDualSourceEmitterEmit(t *testing.T) { chipEmitter := &mockEmitter{} otelEmitter := &mockEmitter{} - emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter) + emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter, false) require.NoError(t, err) err = emitter.Emit(t.Context(), []byte("test message"), "key", "value") @@ -67,7 +69,7 @@ func TestDualSourceEmitterEmit(t *testing.T) { }, } - emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter) + emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter, false) require.NoError(t, err) err = emitter.Emit(t.Context(), []byte("test message")) @@ -76,6 +78,58 @@ func TestDualSourceEmitterEmit(t *testing.T) { }) } +func TestDualSourceEmitterBlockingBehavior(t *testing.T) { + t.Run("legacy mode does not block caller", func(t *testing.T) { + var chipCalled atomic.Bool + chipEmitter := &mockEmitter{ + emitFunc: func(ctx context.Context, body []byte, attrKVs ...any) error { + time.Sleep(200 * time.Millisecond) + chipCalled.Store(true) + return nil + }, + } + otelEmitter := &mockEmitter{} + + emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter, false) + require.NoError(t, err) + + start := time.Now() + err = emitter.Emit(t.Context(), []byte("test")) + elapsed := time.Since(start) + + assert.NoError(t, err) + assert.Less(t, elapsed, 100*time.Millisecond, + "Emit should return immediately; chip ingress runs in a goroutine") + assert.False(t, chipCalled.Load(), + "chip ingress emit should still be in-flight when Emit returns") + + require.NoError(t, emitter.Close()) + assert.True(t, chipCalled.Load(), + "Close should wait for the in-flight chip ingress emit to finish") + }) + + t.Run("non-blocking mode emits inline", func(t *testing.T) { + var chipCalled atomic.Bool + chipEmitter := &mockEmitter{ + emitFunc: func(ctx context.Context, body []byte, attrKVs ...any) error { + chipCalled.Store(true) + return nil + }, + } + otelEmitter := &mockEmitter{} + + emitter, err := beholder.NewDualSourceEmitter(chipEmitter, otelEmitter, true) + require.NoError(t, err) + + err = emitter.Emit(t.Context(), []byte("test")) + assert.NoError(t, err) + assert.True(t, chipCalled.Load(), + "chip ingress emit should complete before Emit returns") + + require.NoError(t, emitter.Close()) + }) +} + // Mock emitter for testing type mockEmitter struct { emitFunc func(ctx context.Context, body []byte, attrKVs ...any) error diff --git a/pkg/beholder/global_test.go b/pkg/beholder/global_test.go index 8de51370ba..4b63faed19 100644 --- a/pkg/beholder/global_test.go +++ b/pkg/beholder/global_test.go @@ -76,6 +76,9 @@ func TestClient_SetGlobalOtelProviders(t *testing.T) { var b strings.Builder client, err := beholder.NewWriterClient(&b) require.NoError(t, err) + defer func() { + require.NoError(t, client.Close()) + }() // Set global Otel Client beholder.SetClient(client) diff --git a/pkg/beholder/httpclient.go b/pkg/beholder/httpclient.go index 693f581452..9f2d12ad8f 100644 --- a/pkg/beholder/httpclient.go +++ b/pkg/beholder/httpclient.go @@ -15,6 +15,9 @@ import ( sdkmetric "go.opentelemetry.io/otel/sdk/metric" sdkresource "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" + + pkglogger "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" ) // Used for testing to override the default exporter @@ -181,13 +184,32 @@ func NewHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, erro } onClose := func() (err error) { - for _, provider := range []shutdowner{messageLoggerProvider, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider} { + for _, provider := range []shutdowner{messageLoggerProvider, loggerProvider, tracerProvider, meterProvider} { err = errors.Join(err, provider.Shutdown(context.Background())) } return } - // HTTP client doesn't currently support rotating auth, so lazySigner is always nil - return &Client{cfg, logger, tracer, meter, emitter, nil, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, nil, onClose}, nil + // HTTP client doesn't currently support rotating auth, so lazySigner is always nil. + c := &Client{ + Config: cfg, + Logger: logger, + Tracer: tracer, + Meter: meter, + Emitter: emitter, + Chip: nil, + LoggerProvider: loggerProvider, + TracerProvider: tracerProvider, + MeterProvider: meterProvider, + MessageLoggerProvider: messageLoggerProvider, + lazySigner: nil, + OnClose: onClose, + } + c.Service, c.eng = services.Config{ + Name: "BeholderClient", + Start: c.start, + Close: c.closeResources, + }.NewServiceEngine(pkglogger.Nop()) + return c, nil } func newHTTPTracerProvider(config Config, resource *sdkresource.Resource, tlsConfig *tls.Config) (*sdktrace.TracerProvider, error) { diff --git a/pkg/beholder/managed_services_test.go b/pkg/beholder/managed_services_test.go new file mode 100644 index 0000000000..f992050c53 --- /dev/null +++ b/pkg/beholder/managed_services_test.go @@ -0,0 +1,87 @@ +package beholder_test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" +) + +func TestManagedServices_NilReceiver(t *testing.T) { + var c *beholder.Client + assert.Nil(t, c.ManagedServices()) +} + +func TestManagedServices_NoopClient(t *testing.T) { + c := beholder.NewNoopClient() + assert.Nil(t, c.ManagedServices()) +} + +func TestManagedServices_BatchDisabled(t *testing.T) { + client, err := beholder.NewClient(beholder.Config{ + OtelExporterGRPCEndpoint: "localhost:4317", + ChipIngressEmitterEnabled: true, + ChipIngressEmitterGRPCEndpoint: "localhost:9090", + ChipIngressInsecureConnection: true, + ChipIngressBatchEmitterEnabled: false, + }) + require.NoError(t, err) + defer func() { _ = client.Close() }() + + assert.Nil(t, client.ManagedServices()) +} + +func TestManagedServices_BatchEnabledCompatibilityShim(t *testing.T) { + lggr := newTestLogger(t) + + client, err := beholder.NewClient(beholder.Config{ + OtelExporterGRPCEndpoint: "localhost:4317", + ChipIngressEmitterEnabled: true, + ChipIngressEmitterGRPCEndpoint: "localhost:9090", + ChipIngressInsecureConnection: true, + ChipIngressBatchEmitterEnabled: true, + ChipIngressLogger: lggr, + ChipIngressBufferSize: 10, + ChipIngressMaxBatchSize: 5, + ChipIngressSendInterval: 50 * time.Millisecond, + ChipIngressSendTimeout: 1 * time.Second, + ChipIngressDrainTimeout: 1 * time.Second, + }) + require.NoError(t, err) + defer func() { _ = client.Close() }() + assert.Nil(t, client.ManagedServices()) +} + +func TestClientLifecycle_BatchEmitterNotAutoStarted(t *testing.T) { + lggr := newTestLogger(t) + + client, err := beholder.NewClient(beholder.Config{ + OtelExporterGRPCEndpoint: "localhost:4317", + ChipIngressEmitterEnabled: true, + ChipIngressEmitterGRPCEndpoint: "localhost:9090", + ChipIngressInsecureConnection: true, + ChipIngressBatchEmitterEnabled: true, + ChipIngressLogger: lggr, + ChipIngressBufferSize: 10, + ChipIngressMaxBatchSize: 5, + ChipIngressSendInterval: 50 * time.Millisecond, + ChipIngressSendTimeout: 1 * time.Second, + ChipIngressDrainTimeout: 1 * time.Second, + }) + require.NoError(t, err) + + // Before Start, client and emitter should report unready. + assert.Error(t, client.Ready()) + err = client.Emitter.Emit(t.Context(), []byte("body"), + beholder.AttrKeyDomain, "platform", + beholder.AttrKeyEntity, "TestEvent", + ) + assert.Error(t, err) + + require.NoError(t, client.Start(t.Context())) + require.NoError(t, client.Ready()) + _ = client.Close() +} diff --git a/pkg/beholder/noop.go b/pkg/beholder/noop.go index 67580ec15c..c6ccfd83cf 100644 --- a/pkg/beholder/noop.go +++ b/pkg/beholder/noop.go @@ -8,6 +8,8 @@ import ( "time" "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" "go.opentelemetry.io/otel/exporters/stdout/stdoutlog" "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" @@ -24,7 +26,7 @@ func NewNoopClient() *Client { cfg := DefaultConfig() // Logger loggerProvider := otellognoop.NewLoggerProvider() - logger := loggerProvider.Logger(defaultPackageName) + otelLogger := loggerProvider.Logger(defaultPackageName) // Tracer tracerProvider := oteltracenoop.NewTracerProvider() tracer := tracerProvider.Tracer(defaultPackageName) @@ -39,7 +41,25 @@ func NewNoopClient() *Client { // ChipIngress chipClient := &chipingress.NoopClient{} - return &Client{cfg, logger, tracer, meter, messageEmitter, chipClient, loggerProvider, tracerProvider, meterProvider, loggerProvider, nil, noopOnClose} + c := &Client{ + Config: cfg, + Logger: otelLogger, + Tracer: tracer, + Meter: meter, + Emitter: messageEmitter, + Chip: chipClient, + LoggerProvider: loggerProvider, + TracerProvider: tracerProvider, + MeterProvider: meterProvider, + MessageLoggerProvider: loggerProvider, + OnClose: noopOnClose, + } + c.Service, c.eng = services.Config{ + Name: "BeholderClient", + Start: c.start, + Close: c.closeResources, + }.NewServiceEngine(logger.Nop()) + return c } // NewStdoutClient creates a new Client with exporters which send telemetry data to standard output @@ -61,7 +81,7 @@ func NewWriterClient(w io.Writer) (*Client, error) { return NewNoopClient(), err } loggerProvider := sdklog.NewLoggerProvider(sdklog.WithProcessor(sdklog.NewSimpleProcessor(loggerExporter))) - logger := loggerProvider.Logger(defaultPackageName) + otelLogger := loggerProvider.Logger(defaultPackageName) // Tracer traceExporter, err := stdouttrace.New(cfg.TraceOptions...) @@ -89,7 +109,7 @@ func NewWriterClient(w io.Writer) (*Client, error) { meter := meterProvider.Meter(defaultPackageName) // MessageEmitter - emitter := messageEmitter{messageLogger: logger} + emitter := messageEmitter{messageLogger: otelLogger} onClose := func() (err error) { for _, provider := range []shutdowner{loggerProvider, tracerProvider, meterProvider} { @@ -98,7 +118,26 @@ func NewWriterClient(w io.Writer) (*Client, error) { return } - return &Client{Config: cfg.Config, Logger: logger, Tracer: tracer, Meter: meter, Emitter: emitter, Chip: &chipingress.NoopClient{}, LoggerProvider: loggerProvider, TracerProvider: tracerProvider, MeterProvider: meterProvider, MessageLoggerProvider: loggerProvider, lazySigner: nil, OnClose: onClose}, nil + c := &Client{ + Config: cfg.Config, + Logger: otelLogger, + Tracer: tracer, + Meter: meter, + Emitter: emitter, + Chip: &chipingress.NoopClient{}, + LoggerProvider: loggerProvider, + TracerProvider: tracerProvider, + MeterProvider: meterProvider, + MessageLoggerProvider: loggerProvider, + lazySigner: nil, + OnClose: onClose, + } + c.Service, c.eng = services.Config{ + Name: "BeholderClient", + Start: c.start, + Close: c.closeResources, + }.NewServiceEngine(logger.Nop()) + return c, nil } type noopMessageEmitter struct{} diff --git a/pkg/chipingress/batch/client.go b/pkg/chipingress/batch/client.go index e02704b26b..6033b9d271 100644 --- a/pkg/chipingress/batch/client.go +++ b/pkg/chipingress/batch/client.go @@ -9,6 +9,9 @@ import ( "time" cepb "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + otelmetric "go.opentelemetry.io/otel/metric" "go.uber.org/zap" "google.golang.org/protobuf/proto" @@ -29,6 +32,7 @@ type seqnumKey struct { type Client struct { client chipingress.Client batchSize int + maxGRPCRequestSize int cloneEvent bool maxConcurrentSends chan struct{} batchInterval time.Duration @@ -42,6 +46,21 @@ type Client struct { batcherDone chan struct{} cancelBatcher context.CancelFunc counters sync.Map // map[seqnumKey]*atomic.Uint64 for per-(source,type) seqnum, cleared on Stop() + + metrics batchClientMetrics +} + +type batchClientMetrics struct { + sendRequestsTotal otelmetric.Int64Counter + sendFailuresTotal otelmetric.Int64Counter + requestSizeMessages otelmetric.Int64Histogram + requestSizeBytes otelmetric.Int64Histogram + requestLatencyMS otelmetric.Float64Histogram + configInfo otelmetric.Int64Gauge + batchSizeAttr otelmetric.MeasurementOption + maxGRPCReqSizeAttr otelmetric.MeasurementOption + successStatusAttr otelmetric.MeasurementOption + failureStatusAttr otelmetric.MeasurementOption } // Opt is a functional option for configuring the batch Client. @@ -53,6 +72,7 @@ func NewBatchClient(client chipingress.Client, opts ...Opt) (*Client, error) { client: client, log: zap.NewNop().Sugar(), batchSize: 10, + maxGRPCRequestSize: 16 * 1024 * 1024, // Match chipingress maxMessageSize default. cloneEvent: true, maxConcurrentSends: make(chan struct{}, 1), messageBuffer: make(chan *messageWithCallback, 200), @@ -68,11 +88,19 @@ func NewBatchClient(client chipingress.Client, opts ...Opt) (*Client, error) { opt(c) } + var err error + c.metrics, err = newBatchClientMetrics() + if err != nil { + return nil, err + } + return c, nil } // Start begins processing messages from the queue and sending them in batches func (b *Client) Start(ctx context.Context) { + b.metrics.recordConfig(context.Background(), b) + // Create a cancellable context for the batcher batcherCtx, cancel := context.WithCancel(ctx) b.cancelBatcher = cancel @@ -110,7 +138,9 @@ func (b *Client) Start(ctx context.Context) { // Forcibly shutdowns down after timeout if not completed. func (b *Client) Stop() { b.shutdownOnce.Do(func() { - ctx, cancel := b.stopCh.CtxWithTimeout(b.shutdownTimeout) + // Use a standalone timeout context so the shutdown wait isn't cancelled + // by close(b.stopCh) below. + ctx, cancel := context.WithTimeout(context.Background(), b.shutdownTimeout) defer cancel() if b.cancelBatcher != nil { @@ -230,7 +260,11 @@ func (b *Client) sendBatch(ctx context.Context, messages []*messageWithCallback) for i, msg := range messages { events[i] = msg.event } - _, err := b.client.PublishBatch(ctxTimeout, &chipingress.CloudEventBatch{Events: events}) + batchReq := &chipingress.CloudEventBatch{Events: events} + batchBytes := proto.Size(batchReq) + startedAt := time.Now() + _, err := b.client.PublishBatch(ctxTimeout, batchReq) + b.metrics.recordSend(context.Background(), len(messages), batchBytes, time.Since(startedAt), err == nil) if err != nil { b.log.Errorw("failed to publish batch", "error", err) } @@ -253,6 +287,13 @@ func WithBatchSize(batchSize int) Opt { } } +// WithMaxGRPCRequestSize sets the max gRPC request size in bytes used for metric comparison attributes. +func WithMaxGRPCRequestSize(maxReqSize int) Opt { + return func(c *Client) { + c.maxGRPCRequestSize = maxReqSize + } +} + // WithEventClone controls whether QueueMessage clones events before stamping seqnum and buffering. // Defaults to true for safety when caller reuses event pointers. func WithEventClone(clone bool) Opt { @@ -302,3 +343,113 @@ func WithLogger(log *zap.SugaredLogger) Opt { c.log = log } } + +func newBatchClientMetrics() (batchClientMetrics, error) { + meter := otel.Meter("chipingress/batch_client") + sendRequestsTotal, err := meter.Int64Counter( + "chip_ingress.batch.send_requests_total", + otelmetric.WithDescription("Total PublishBatch requests sent by batch client"), + otelmetric.WithUnit("{request}"), + ) + if err != nil { + return batchClientMetrics{}, err + } + sendFailuresTotal, err := meter.Int64Counter( + "chip_ingress.batch.send_failures_total", + otelmetric.WithDescription("Total failed PublishBatch requests sent by batch client"), + otelmetric.WithUnit("{request}"), + ) + if err != nil { + return batchClientMetrics{}, err + } + requestSizeMessages, err := meter.Int64Histogram( + "chip_ingress.batch.request_size_messages", + otelmetric.WithDescription("PublishBatch request size measured in number of events"), + otelmetric.WithUnit("{event}"), + ) + if err != nil { + return batchClientMetrics{}, err + } + requestSizeBytes, err := meter.Int64Histogram( + "chip_ingress.batch.request_size_bytes", + otelmetric.WithDescription("PublishBatch request size measured in bytes"), + otelmetric.WithUnit("By"), + ) + if err != nil { + return batchClientMetrics{}, err + } + requestLatencyMS, err := meter.Float64Histogram( + "chip_ingress.batch.request_latency_ms", + otelmetric.WithDescription("PublishBatch end-to-end latency in milliseconds"), + otelmetric.WithUnit("ms"), + ) + if err != nil { + return batchClientMetrics{}, err + } + configInfo, err := meter.Int64Gauge( + "chip_ingress.batch.config.info", + otelmetric.WithDescription("Batch client configuration info metric"), + otelmetric.WithUnit("{info}"), + ) + if err != nil { + return batchClientMetrics{}, err + } + + return batchClientMetrics{ + sendRequestsTotal: sendRequestsTotal, + sendFailuresTotal: sendFailuresTotal, + requestSizeMessages: requestSizeMessages, + requestSizeBytes: requestSizeBytes, + requestLatencyMS: requestLatencyMS, + configInfo: configInfo, + successStatusAttr: otelmetric.WithAttributeSet(attribute.NewSet( + attribute.String("status", "success"), + )), + failureStatusAttr: otelmetric.WithAttributeSet(attribute.NewSet( + attribute.String("status", "failure"), + )), + }, nil +} + +func (m *batchClientMetrics) recordConfig(ctx context.Context, c *Client) { + m.batchSizeAttr = otelmetric.WithAttributeSet(attribute.NewSet( + attribute.Int("max_batch_size", c.batchSize), + )) + m.maxGRPCReqSizeAttr = otelmetric.WithAttributeSet(attribute.NewSet( + attribute.Int("max_grpc_request_size_bytes", c.maxGRPCRequestSize), + )) + m.configInfo.Record(ctx, 1, otelmetric.WithAttributes( + attribute.Int("max_batch_size", c.batchSize), + attribute.Int("message_buffer_size", cap(c.messageBuffer)), + attribute.Int("max_concurrent_sends", cap(c.maxConcurrentSends)), + attribute.Int64("batch_interval_ms", c.batchInterval.Milliseconds()), + attribute.Int64("max_publish_timeout_ms", c.maxPublishTimeout.Milliseconds()), + attribute.Int64("shutdown_timeout_ms", c.shutdownTimeout.Milliseconds()), + attribute.Bool("clone_event", c.cloneEvent), + attribute.Int("max_grpc_request_size_bytes", c.maxGRPCRequestSize), + )) +} + +func (m *batchClientMetrics) recordSend(ctx context.Context, messageCount int, requestBytes int, latency time.Duration, success bool) { + statusAttr := m.successStatusAttr + if !success { + statusAttr = m.failureStatusAttr + } + m.sendRequestsTotal.Add(ctx, 1, statusAttr) + if !success { + m.sendFailuresTotal.Add(ctx, 1) + } + + messageSizeOpts := []otelmetric.RecordOption{} + if m.batchSizeAttr != nil { + messageSizeOpts = append(messageSizeOpts, m.batchSizeAttr) + } + requestSizeOpts := []otelmetric.RecordOption{} + if m.maxGRPCReqSizeAttr != nil { + requestSizeOpts = append(requestSizeOpts, m.maxGRPCReqSizeAttr) + } + + m.requestSizeMessages.Record(ctx, int64(messageCount), messageSizeOpts...) + m.requestSizeBytes.Record(ctx, int64(requestBytes), requestSizeOpts...) + m.requestLatencyMS.Record(ctx, float64(latency)/float64(time.Millisecond), statusAttr) +} diff --git a/pkg/chipingress/batch/client_test.go b/pkg/chipingress/batch/client_test.go index 7f8c356fb3..5c73e5906d 100644 --- a/pkg/chipingress/batch/client_test.go +++ b/pkg/chipingress/batch/client_test.go @@ -11,6 +11,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" "github.com/smartcontractkit/chainlink-common/pkg/chipingress" "github.com/smartcontractkit/chainlink-common/pkg/chipingress/mocks" @@ -837,6 +841,11 @@ func TestStop(t *testing.T) { t.Run("QueueMessage returns error after Stop", func(t *testing.T) { mockClient := mocks.NewClient(t) + mockClient. + On("PublishBatch", mock.Anything, mock.Anything). + Return(&chipingress.PublishResponse{}, nil). + Maybe() + client, err := NewBatchClient(mockClient, WithBatchSize(10)) require.NoError(t, err) @@ -853,7 +862,7 @@ func TestStop(t *testing.T) { }, nil) require.NoError(t, err) - // Stop the client + // Stop the client — drains any buffered messages client.Stop() // Queue message after stop - should fail @@ -1109,3 +1118,237 @@ func TestSeqnum(t *testing.T) { } }) } + +func TestBatchClient_Metrics(t *testing.T) { + t.Run("records success path metrics", func(t *testing.T) { + reader, restore := useTestMeterProvider(t) + defer restore() + + mockClient := mocks.NewClient(t) + done := make(chan struct{}) + mockClient. + On("PublishBatch", mock.Anything, mock.Anything). + Return(&chipingress.PublishResponse{}, nil). + Run(func(_ mock.Arguments) { close(done) }). + Once() + + client, err := NewBatchClient( + mockClient, + WithBatchSize(1), + WithBatchInterval(time.Second), + WithMessageBuffer(10), + WithMaxGRPCRequestSize(2048), + ) + require.NoError(t, err) + client.Start(t.Context()) + + err = client.QueueMessage(&chipingress.CloudEventPb{ + Id: "metric-success", + Source: "platform", + Type: "MetricSuccess", + }, nil) + require.NoError(t, err) + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for PublishBatch") + } + + client.Stop() + rm := collectResourceMetrics(t, reader) + + reqTotal := mustMetric(t, rm, "chip_ingress.batch.send_requests_total") + reqSum, ok := reqTotal.Data.(metricdata.Sum[int64]) + require.True(t, ok) + successPoint := mustInt64SumPointWithAttr(t, reqSum, "status", "success") + assert.GreaterOrEqual(t, successPoint.Value, int64(1)) + + msgSize := mustMetric(t, rm, "chip_ingress.batch.request_size_messages") + msgSizeHist, ok := msgSize.Data.(metricdata.Histogram[int64]) + require.True(t, ok) + msgSizePoint := mustInt64HistogramPointWithIntAttr(t, msgSizeHist, "max_batch_size", 1) + assert.GreaterOrEqual(t, msgSizePoint.Count, uint64(1)) + + reqSize := mustMetric(t, rm, "chip_ingress.batch.request_size_bytes") + reqSizeHist, ok := reqSize.Data.(metricdata.Histogram[int64]) + require.True(t, ok) + reqSizePoint := mustInt64HistogramPointWithIntAttr(t, reqSizeHist, "max_grpc_request_size_bytes", 2048) + assert.GreaterOrEqual(t, reqSizePoint.Count, uint64(1)) + + latency := mustMetric(t, rm, "chip_ingress.batch.request_latency_ms") + latencyHist, ok := latency.Data.(metricdata.Histogram[float64]) + require.True(t, ok) + latencyPoint := mustFloat64HistogramPointWithAttr(t, latencyHist, "status", "success") + assert.GreaterOrEqual(t, latencyPoint.Count, uint64(1)) + + config := mustMetric(t, rm, "chip_ingress.batch.config.info") + configGauge, ok := config.Data.(metricdata.Gauge[int64]) + require.True(t, ok) + require.NotEmpty(t, configGauge.DataPoints) + assert.Equal(t, int64(1), configGauge.DataPoints[0].Value) + assert.True(t, hasIntAttr(configGauge.DataPoints[0].Attributes, "max_batch_size", 1)) + assert.True(t, hasIntAttr(configGauge.DataPoints[0].Attributes, "max_grpc_request_size_bytes", 2048)) + }) + + t.Run("records failure counters and latency", func(t *testing.T) { + reader, restore := useTestMeterProvider(t) + defer restore() + + mockClient := mocks.NewClient(t) + done := make(chan struct{}) + mockClient. + On("PublishBatch", mock.Anything, mock.Anything). + Return(&chipingress.PublishResponse{}, assert.AnError). + Run(func(_ mock.Arguments) { close(done) }). + Once() + + client, err := NewBatchClient(mockClient, WithBatchSize(1), WithMessageBuffer(10)) + require.NoError(t, err) + client.Start(t.Context()) + + err = client.QueueMessage(&chipingress.CloudEventPb{ + Id: "metric-failure", + Source: "platform", + Type: "MetricFailure", + }, nil) + require.NoError(t, err) + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for PublishBatch") + } + + client.Stop() + rm := collectResourceMetrics(t, reader) + + reqTotal := mustMetric(t, rm, "chip_ingress.batch.send_requests_total") + reqSum, ok := reqTotal.Data.(metricdata.Sum[int64]) + require.True(t, ok) + failureReq := mustInt64SumPointWithAttr(t, reqSum, "status", "failure") + assert.GreaterOrEqual(t, failureReq.Value, int64(1)) + + failures := mustMetric(t, rm, "chip_ingress.batch.send_failures_total") + failuresSum, ok := failures.Data.(metricdata.Sum[int64]) + require.True(t, ok) + require.NotEmpty(t, failuresSum.DataPoints) + assert.GreaterOrEqual(t, failuresSum.DataPoints[0].Value, int64(1)) + + latency := mustMetric(t, rm, "chip_ingress.batch.request_latency_ms") + latencyHist, ok := latency.Data.(metricdata.Histogram[float64]) + require.True(t, ok) + failureLatency := mustFloat64HistogramPointWithAttr(t, latencyHist, "status", "failure") + assert.GreaterOrEqual(t, failureLatency.Count, uint64(1)) + }) +} + +func BenchmarkBatchClient_QueueMessage(b *testing.B) { + client, err := NewBatchClient( + &chipingress.NoopClient{}, + WithBatchSize(b.N+1), + WithMessageBuffer(b.N+10), + WithBatchInterval(time.Hour), + ) + if err != nil { + b.Fatal(err) + } + client.Start(context.Background()) + defer client.Stop() + + payload := &chipingress.CloudEventPb{ + Source: "bench", + Type: "bench.event", + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + payload.Id = strconv.Itoa(i) + if err := client.QueueMessage(payload, nil); err != nil { + b.Fatal(err) + } + } +} + +func useTestMeterProvider(t *testing.T) (*sdkmetric.ManualReader, func()) { + t.Helper() + prev := otel.GetMeterProvider() + reader := sdkmetric.NewManualReader() + provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) + otel.SetMeterProvider(provider) + return reader, func() { + require.NoError(t, provider.Shutdown(t.Context())) + otel.SetMeterProvider(prev) + } +} + +func collectResourceMetrics(t *testing.T, reader *sdkmetric.ManualReader) metricdata.ResourceMetrics { + t.Helper() + var rm metricdata.ResourceMetrics + require.NoError(t, reader.Collect(t.Context(), &rm)) + return rm +} + +func mustMetric(t *testing.T, rm metricdata.ResourceMetrics, name string) metricdata.Metrics { + t.Helper() + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + if m.Name == name { + return m + } + } + } + t.Fatalf("metric %q not found", name) + return metricdata.Metrics{} +} + +func mustInt64SumPointWithAttr(t *testing.T, sum metricdata.Sum[int64], key, want string) metricdata.DataPoint[int64] { + t.Helper() + for _, dp := range sum.DataPoints { + if hasStringAttr(dp.Attributes, key, want) { + return dp + } + } + t.Fatalf("sum datapoint with %s=%s not found", key, want) + return metricdata.DataPoint[int64]{} +} + +func mustInt64HistogramPointWithIntAttr(t *testing.T, hist metricdata.Histogram[int64], key string, want int) metricdata.HistogramDataPoint[int64] { + t.Helper() + for _, dp := range hist.DataPoints { + if hasIntAttr(dp.Attributes, key, want) { + return dp + } + } + t.Fatalf("histogram datapoint with %s=%d not found", key, want) + return metricdata.HistogramDataPoint[int64]{} +} + +func mustFloat64HistogramPointWithAttr(t *testing.T, hist metricdata.Histogram[float64], key, want string) metricdata.HistogramDataPoint[float64] { + t.Helper() + for _, dp := range hist.DataPoints { + if hasStringAttr(dp.Attributes, key, want) { + return dp + } + } + t.Fatalf("histogram datapoint with %s=%s not found", key, want) + return metricdata.HistogramDataPoint[float64]{} +} + +func hasStringAttr(set attribute.Set, key, want string) bool { + for _, kv := range set.ToSlice() { + if string(kv.Key) == key { + return kv.Value.AsString() == want + } + } + return false +} + +func hasIntAttr(set attribute.Set, key string, want int) bool { + for _, kv := range set.ToSlice() { + if string(kv.Key) == key { + return int(kv.Value.AsInt64()) == want + } + } + return false +} diff --git a/pkg/chipingress/go.mod b/pkg/chipingress/go.mod index 1ac0640eab..9acc469ac8 100644 --- a/pkg/chipingress/go.mod +++ b/pkg/chipingress/go.mod @@ -8,7 +8,9 @@ require ( github.com/google/uuid v1.6.0 github.com/stretchr/testify v1.11.1 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.63.0 + go.opentelemetry.io/otel v1.43.0 go.opentelemetry.io/otel/metric v1.43.0 + go.opentelemetry.io/otel/sdk/metric v1.43.0 go.opentelemetry.io/otel/trace v1.43.0 go.uber.org/zap v1.27.0 google.golang.org/grpc v1.79.3 @@ -26,9 +28,7 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/stretchr/objx v0.5.2 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect - go.opentelemetry.io/otel v1.43.0 // indirect go.opentelemetry.io/otel/sdk v1.43.0 // indirect - go.opentelemetry.io/otel/sdk/metric v1.43.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.48.0 // indirect golang.org/x/sys v0.42.0 // indirect diff --git a/pkg/loop/config.go b/pkg/loop/config.go index 0335b8ce7b..40a4fb47f1 100644 --- a/pkg/loop/config.go +++ b/pkg/loop/config.go @@ -84,8 +84,9 @@ const ( envTelemetryMetricCompressor = "CL_TELEMETRY_METRIC_COMPRESSOR" envTelemetryLogCompressor = "CL_TELEMETRY_LOG_COMPRESSOR" - envChipIngressEndpoint = "CL_CHIP_INGRESS_ENDPOINT" - envChipIngressInsecureConnection = "CL_CHIP_INGRESS_INSECURE_CONNECTION" + envChipIngressEndpoint = "CL_CHIP_INGRESS_ENDPOINT" + envChipIngressInsecureConnection = "CL_CHIP_INGRESS_INSECURE_CONNECTION" + envChipIngressBatchEmitterEnabled = "CL_CHIP_INGRESS_BATCH_EMITTER_ENABLED" envCRESettings = cresettings.EnvNameSettings envCRESettingsDefault = cresettings.EnvNameSettingsDefault @@ -98,6 +99,7 @@ type EnvConfig struct { ChipIngressEndpoint string ChipIngressInsecureConnection bool + ChipIngressBatchEmitterEnabled bool CRESettings string CRESettingsDefault string @@ -255,6 +257,7 @@ func (e *EnvConfig) AsCmdEnv() (env []string) { add(envChipIngressEndpoint, e.ChipIngressEndpoint) add(envChipIngressInsecureConnection, strconv.FormatBool(e.ChipIngressInsecureConnection)) + add(envChipIngressBatchEmitterEnabled, strconv.FormatBool(e.ChipIngressBatchEmitterEnabled)) if e.CRESettings != "" { add(envCRESettings, e.CRESettings) @@ -486,6 +489,10 @@ func (e *EnvConfig) parse() error { if err != nil { return fmt.Errorf("failed to parse %s: %w", envChipIngressInsecureConnection, err) } + e.ChipIngressBatchEmitterEnabled, err = getBool(envChipIngressBatchEmitterEnabled) + if err != nil { + return fmt.Errorf("failed to parse %s: %w", envChipIngressBatchEmitterEnabled, err) + } } e.CRESettings = os.Getenv(envCRESettings) diff --git a/pkg/loop/config_test.go b/pkg/loop/config_test.go index 34cd59a047..8f5f42f236 100644 --- a/pkg/loop/config_test.go +++ b/pkg/loop/config_test.go @@ -84,8 +84,9 @@ func TestEnvConfig_parse(t *testing.T) { envTelemetryEmitterMaxQueueSize: "1000", envTelemetryLogStreamingEnabled: "false", - envChipIngressEndpoint: "chip-ingress.example.com:50051", - envChipIngressInsecureConnection: "true", + envChipIngressEndpoint: "chip-ingress.example.com:50051", + envChipIngressInsecureConnection: "true", + envChipIngressBatchEmitterEnabled: "false", envCRESettings: `{"global":{}}`, envCRESettingsDefault: `{"foo":"bar"}`, @@ -195,8 +196,9 @@ var envCfgFull = EnvConfig{ TelemetryEmitterMaxQueueSize: 1000, TelemetryLogStreamingEnabled: false, - ChipIngressEndpoint: "chip-ingress.example.com:50051", - ChipIngressInsecureConnection: true, + ChipIngressEndpoint: "chip-ingress.example.com:50051", + ChipIngressInsecureConnection: true, + ChipIngressBatchEmitterEnabled: false, CRESettings: `{"global":{}}`, CRESettingsDefault: `{"foo":"bar"}`, @@ -259,6 +261,7 @@ func TestEnvConfig_AsCmdEnv(t *testing.T) { // Assert ChipIngress environment variables assert.Equal(t, "chip-ingress.example.com:50051", got[envChipIngressEndpoint]) assert.Equal(t, "true", got[envChipIngressInsecureConnection]) + assert.Equal(t, "false", got[envChipIngressBatchEmitterEnabled]) assert.Equal(t, `{"global":{}}`, got[envCRESettings]) assert.Equal(t, `{"foo":"bar"}`, got[envCRESettingsDefault]) diff --git a/pkg/loop/server.go b/pkg/loop/server.go index a7d92ee705..226dabc6c4 100644 --- a/pkg/loop/server.go +++ b/pkg/loop/server.go @@ -101,6 +101,7 @@ type Server struct { checker *services.HealthChecker LimitsFactory limits.Factory profiler *pyroscope.Profiler + beholderClient *beholder.Client } func newServer(loggerName string) (*Server, error) { @@ -180,6 +181,8 @@ func (s *Server) start(opts ...ServerOpt) error { ChipIngressEmitterEnabled: s.EnvConfig.ChipIngressEndpoint != "", ChipIngressEmitterGRPCEndpoint: s.EnvConfig.ChipIngressEndpoint, ChipIngressInsecureConnection: s.EnvConfig.ChipIngressInsecureConnection, + ChipIngressBatchEmitterEnabled: s.EnvConfig.ChipIngressBatchEmitterEnabled, + ChipIngressLogger: s.Logger, MetricCompressor: s.EnvConfig.TelemetryMetricCompressor, } @@ -216,6 +219,10 @@ func (s *Server) start(opts ...ServerOpt) error { if err != nil { return fmt.Errorf("failed to create beholder client: %w", err) } + if err := beholderClient.Start(ctx); err != nil { + return fmt.Errorf("failed to start beholder client: %w", err) + } + s.beholderClient = beholderClient beholder.SetClient(beholderClient) beholder.SetGlobalOtelProviders() @@ -351,6 +358,9 @@ func (s *Server) Register(c services.HealthReporter) error { return s.checker.Re // Stop closes resources and flushes logs. func (s *Server) Stop() { + if s.beholderClient != nil { + s.Logger.ErrorIfFn(s.beholderClient.Close, "Failed to close beholder client") + } if s.dbStatsReporter != nil { s.dbStatsReporter.Stop() }