Skip to content

Commit 0640480

Browse files
authored
pkg/beholder: log chip ingress batch emitter drops on backpressure and send failure (#2070)
1 parent 4a2d236 commit 0640480

2 files changed

Lines changed: 41 additions & 2 deletions

File tree

pkg/beholder/batch_emitter_service.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ func (e *ChipIngressBatchEmitterService) emitInternal(ctx context.Context, body
158158
// cancelled contexts.
159159
if sendErr != nil {
160160
e.metrics.eventsDropped.Add(ctx, 1, metricAttrs)
161+
e.eng.Errorw("failed to emit to chip ingress", "error", sendErr, "domain", domain, "entity", entity)
161162
} else {
162163
e.metrics.eventsSent.Add(ctx, 1, metricAttrs)
163164
}
@@ -167,6 +168,7 @@ func (e *ChipIngressBatchEmitterService) emitInternal(ctx context.Context, body
167168
})
168169
if queueErr != nil {
169170
e.metrics.eventsDropped.Add(ctx, 1, metricAttrs)
171+
e.eng.Errorw("failed to queue message for chip ingress", "error", queueErr, "domain", domain, "entity", entity)
170172
if callback != nil {
171173
callback(queueErr)
172174
}

pkg/beholder/batch_emitter_service_test.go

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package beholder_test
22

33
import (
44
"context"
5+
"fmt"
56
"sync"
67
"testing"
78
"time"
@@ -13,6 +14,8 @@ import (
1314
"go.opentelemetry.io/otel/attribute"
1415
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
1516
"go.opentelemetry.io/otel/sdk/metric/metricdata"
17+
"go.uber.org/zap"
18+
"go.uber.org/zap/zaptest/observer"
1619

1720
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
1821
"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
@@ -356,6 +359,8 @@ func TestChipIngressBatchEmitterService_EmitWithCallback(t *testing.T) {
356359
})
357360

358361
t.Run("callback receives error when buffer is full", func(t *testing.T) {
362+
lggr, observed := logger.TestObserved(t, zap.InfoLevel)
363+
359364
clientMock := mocks.NewClient(t)
360365
clientMock.EXPECT().Close().Return(nil).Maybe()
361366

@@ -380,7 +385,7 @@ func TestChipIngressBatchEmitterService_EmitWithCallback(t *testing.T) {
380385
cfg.ChipIngressSendInterval = 50 * time.Millisecond
381386
cfg.ChipIngressDrainTimeout = 200 * time.Millisecond
382387

383-
emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t))
388+
emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, lggr)
384389
require.NoError(t, err)
385390
require.NoError(t, emitter.Start(t.Context()))
386391
defer close(sendBlocked)
@@ -417,6 +422,15 @@ func TestChipIngressBatchEmitterService_EmitWithCallback(t *testing.T) {
417422
case <-time.After(time.Second):
418423
t.Fatal("callback was not invoked for dropped event")
419424
}
425+
426+
logs := observed.FilterMessage("failed to queue message for chip ingress")
427+
require.GreaterOrEqual(t, logs.Len(), 1, "expected error log for queue failure")
428+
entry := logs.All()[0]
429+
assert.Equal(t, zap.ErrorLevel, entry.Level)
430+
fieldMap := logFieldMap(entry)
431+
assert.Contains(t, fieldMap, "error")
432+
assert.Equal(t, "platform", fieldMap["domain"])
433+
assert.Equal(t, "TestEvent", fieldMap["entity"])
420434
})
421435

422436
t.Run("nil callback behaves like Emit", func(t *testing.T) {
@@ -490,6 +504,8 @@ func TestChipIngressBatchEmitterService_Metrics(t *testing.T) {
490504
reader, restore := useEmitterTestMeterProvider(t)
491505
defer restore()
492506

507+
lggr, observed := logger.TestObserved(t, zap.InfoLevel)
508+
493509
clientMock := mocks.NewClient(t)
494510
clientMock.EXPECT().Close().Return(nil).Maybe()
495511
done := make(chan struct{})
@@ -502,7 +518,7 @@ func TestChipIngressBatchEmitterService_Metrics(t *testing.T) {
502518
cfg := newTestConfig()
503519
cfg.ChipIngressMaxBatchSize = 1
504520
cfg.ChipIngressSendInterval = time.Second
505-
emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, newTestLogger(t))
521+
emitter, err := beholder.NewChipIngressBatchEmitterService(clientMock, cfg, lggr)
506522
require.NoError(t, err)
507523
require.NoError(t, emitter.Start(t.Context()))
508524

@@ -525,9 +541,30 @@ func TestChipIngressBatchEmitterService_Metrics(t *testing.T) {
525541
require.True(t, ok)
526542
dp := mustEmitterInt64SumPoint(t, sum, "domain", "platform", "entity", "MetricDropEvent")
527543
assert.GreaterOrEqual(t, dp.Value, int64(1))
544+
545+
logs := observed.FilterMessage("failed to emit to chip ingress")
546+
require.GreaterOrEqual(t, logs.Len(), 1, "expected error log for publish failure")
547+
entry := logs.All()[0]
548+
assert.Equal(t, zap.ErrorLevel, entry.Level)
549+
fieldMap := logFieldMap(entry)
550+
assert.Contains(t, fieldMap, "error")
551+
assert.Equal(t, "platform", fieldMap["domain"])
552+
assert.Equal(t, "MetricDropEvent", fieldMap["entity"])
528553
})
529554
}
530555

556+
func logFieldMap(entry observer.LoggedEntry) map[string]string {
557+
m := make(map[string]string)
558+
for _, f := range entry.Context {
559+
if f.Interface != nil {
560+
m[f.Key] = fmt.Sprintf("%v", f.Interface)
561+
} else if f.String != "" {
562+
m[f.Key] = f.String
563+
}
564+
}
565+
return m
566+
}
567+
531568
func BenchmarkChipIngressBatchEmitterService_Emit(b *testing.B) {
532569
cfg := beholder.Config{
533570
ChipIngressBufferSize: uint(b.N + 10),

0 commit comments

Comments
 (0)