Skip to content

Commit 4a2d236

Browse files
authored
pkg/beholder: add batch emitter service with service-engine lifecycle (#2059)
* pkg/beholder: add batch emitter service and bump chipingress batching dependency * chore: bump chipingress dependency to publishBatch * refactor: remove context.Background() in batch emitter service and tests Use caller ctx for OTel metric Add calls (non-blocking, tolerates cancelled contexts) and b.Context() in benchmarks. * chore: bump chipingress dependency to latest main * fix: use engine lifecycle context in batch emitter Start instead of startup ctx The services contract forbids retaining the startup context after Start returns. Use eng.NewCtx() to get a lifecycle-owned context that is cancelled when StopChan closes during service shutdown, rather than passing through the caller's startup context. * fix: use defaulted logger for legacy chip-ingress emitter to prevent nil panic * bump chipingress to main (bacfb6b), gomodtidy * refactor: use config structs for emitter/noop constructors preserving signatures Introduce ChipIngressEmitterConfig, DualSourceEmitterConfig, and NoopClientConfig structs with New() factory methods. The public constructors (NewChipIngressEmitter, NewDualSourceEmitter, NewNoopClient) retain their original signatures and delegate to the config structs. Internal callers that need a logger use the config struct directly; nil Lggr defaults to logger.Nop().
1 parent bacfb6b commit 4a2d236

22 files changed

Lines changed: 1297 additions & 125 deletions

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ require (
4242
github.com/scylladb/go-reflectx v1.0.1
4343
github.com/shopspring/decimal v1.4.0
4444
github.com/smartcontractkit/chain-selectors v1.0.89
45-
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10
45+
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260518142424-bacfb6ba4146
4646
github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4
4747
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260514104516-a827acdffe43
4848
github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b

go.sum

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
package beholder
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
8+
"go.opentelemetry.io/otel"
9+
"go.opentelemetry.io/otel/attribute"
10+
otelmetric "go.opentelemetry.io/otel/metric"
11+
12+
"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
13+
"github.com/smartcontractkit/chainlink-common/pkg/chipingress/batch"
14+
"github.com/smartcontractkit/chainlink-common/pkg/logger"
15+
"github.com/smartcontractkit/chainlink-common/pkg/services"
16+
)
17+
18+
// ChipIngressBatchEmitterService batches events and sends them via chipingress.Client.PublishBatch.
19+
// It implements the Emitter interface.
20+
type ChipIngressBatchEmitterService struct {
21+
services.Service
22+
eng *services.Engine
23+
24+
batchClient *batch.Client
25+
26+
metricAttrsCache sync.Map // map[string]otelmetric.MeasurementOption
27+
metrics batchEmitterMetrics
28+
}
29+
30+
type batchEmitterMetrics struct {
31+
eventsSent otelmetric.Int64Counter
32+
eventsDropped otelmetric.Int64Counter
33+
}
34+
35+
// NewChipIngressBatchEmitterService creates a batch emitter service backed by the given chipingress client.
36+
func NewChipIngressBatchEmitterService(client chipingress.Client, cfg Config, lggr logger.Logger) (*ChipIngressBatchEmitterService, error) {
37+
if client == nil {
38+
return nil, fmt.Errorf("chip ingress client is nil")
39+
}
40+
41+
defaults := DefaultConfig()
42+
bufferSize := int(cfg.ChipIngressBufferSize)
43+
if bufferSize == 0 {
44+
bufferSize = int(defaults.ChipIngressBufferSize)
45+
}
46+
maxBatchSize := int(cfg.ChipIngressMaxBatchSize)
47+
if maxBatchSize == 0 {
48+
maxBatchSize = int(defaults.ChipIngressMaxBatchSize)
49+
}
50+
maxConcurrentSends := cfg.ChipIngressMaxConcurrentSends
51+
if maxConcurrentSends == 0 {
52+
maxConcurrentSends = defaults.ChipIngressMaxConcurrentSends
53+
}
54+
sendInterval := cfg.ChipIngressSendInterval
55+
if sendInterval == 0 {
56+
sendInterval = defaults.ChipIngressSendInterval
57+
}
58+
sendTimeout := cfg.ChipIngressSendTimeout
59+
if sendTimeout == 0 {
60+
sendTimeout = defaults.ChipIngressSendTimeout
61+
}
62+
drainTimeout := cfg.ChipIngressDrainTimeout
63+
if drainTimeout == 0 {
64+
drainTimeout = defaults.ChipIngressDrainTimeout
65+
}
66+
67+
meter := otel.Meter("beholder/chip_ingress_batch_emitter")
68+
metrics, err := newBatchEmitterMetrics(meter)
69+
if err != nil {
70+
return nil, fmt.Errorf("failed to create batch emitter metrics: %w", err)
71+
}
72+
73+
batchClient, err := batch.NewBatchClient(client,
74+
batch.WithBatchSize(maxBatchSize),
75+
batch.WithMessageBuffer(bufferSize),
76+
batch.WithBatchInterval(sendInterval),
77+
batch.WithMaxPublishTimeout(sendTimeout),
78+
batch.WithShutdownTimeout(drainTimeout),
79+
batch.WithMaxConcurrentSends(maxConcurrentSends),
80+
batch.WithEventClone(false),
81+
)
82+
if err != nil {
83+
return nil, fmt.Errorf("failed to create batch client: %w", err)
84+
}
85+
86+
e := &ChipIngressBatchEmitterService{
87+
batchClient: batchClient,
88+
metrics: metrics,
89+
}
90+
91+
e.Service, e.eng = services.Config{
92+
Name: "ChipIngressBatchEmitterService",
93+
Start: e.start,
94+
Close: e.stop,
95+
}.NewServiceEngine(lggr)
96+
97+
return e, nil
98+
}
99+
100+
func (e *ChipIngressBatchEmitterService) start(_ context.Context) error {
101+
// Do not pass the startup ctx — the services contract forbids retaining it
102+
// after Start returns. Use the engine's lifecycle context so the batcher
103+
// is cancelled when the service shuts down (StopChan closes before stop() runs).
104+
ctx, _ := e.eng.NewCtx()
105+
e.batchClient.Start(ctx)
106+
return nil
107+
}
108+
109+
func (e *ChipIngressBatchEmitterService) stop() error {
110+
e.batchClient.Stop()
111+
return nil
112+
}
113+
114+
// Emit queues an event for batched delivery without blocking.
115+
// Returns an error if the emitter is stopped or the context is cancelled.
116+
// If the buffer is full, the event is silently dropped.
117+
func (e *ChipIngressBatchEmitterService) Emit(ctx context.Context, body []byte, attrKVs ...any) error {
118+
return e.emitInternal(ctx, body, nil, attrKVs...)
119+
}
120+
121+
// EmitWithCallback works like Emit but invokes callback once the event's fate
122+
// is determined (nil on success, non-nil on failure or buffer-full drop).
123+
//
124+
// If EmitWithCallback returns a non-nil error, the callback will NOT be invoked.
125+
// If it returns nil, the callback is guaranteed to fire exactly once.
126+
func (e *ChipIngressBatchEmitterService) EmitWithCallback(ctx context.Context, body []byte, callback func(error), attrKVs ...any) error {
127+
return e.emitInternal(ctx, body, callback, attrKVs...)
128+
}
129+
130+
func (e *ChipIngressBatchEmitterService) emitInternal(ctx context.Context, body []byte, callback func(error), attrKVs ...any) error {
131+
return e.eng.IfStarted(func() error {
132+
domain, entity, err := ExtractSourceAndType(attrKVs...)
133+
if err != nil {
134+
return err
135+
}
136+
137+
attributes := newAttributes(attrKVs...)
138+
139+
event, err := chipingress.NewEvent(domain, entity, body, attributes)
140+
if err != nil {
141+
return fmt.Errorf("failed to create CloudEvent: %w", err)
142+
}
143+
eventPb, err := chipingress.EventToProto(event)
144+
if err != nil {
145+
return fmt.Errorf("failed to convert to proto: %w", err)
146+
}
147+
148+
if err := ctx.Err(); err != nil {
149+
return err
150+
}
151+
152+
metricAttrs := e.metricAttrsFor(domain, entity)
153+
154+
queueErr := e.batchClient.QueueMessage(eventPb, func(sendErr error) {
155+
// The callback fires asynchronously after the batch is sent,
156+
// so the caller's ctx may already be cancelled. Use ctx directly
157+
// for metric recording — OTel Add is non-blocking and tolerates
158+
// cancelled contexts.
159+
if sendErr != nil {
160+
e.metrics.eventsDropped.Add(ctx, 1, metricAttrs)
161+
} else {
162+
e.metrics.eventsSent.Add(ctx, 1, metricAttrs)
163+
}
164+
if callback != nil {
165+
callback(sendErr)
166+
}
167+
})
168+
if queueErr != nil {
169+
e.metrics.eventsDropped.Add(ctx, 1, metricAttrs)
170+
if callback != nil {
171+
callback(queueErr)
172+
}
173+
}
174+
175+
return nil
176+
})
177+
}
178+
179+
func (e *ChipIngressBatchEmitterService) metricAttrsFor(domain, entity string) otelmetric.MeasurementOption {
180+
key := domain + "\x00" + entity
181+
if v, ok := e.metricAttrsCache.Load(key); ok {
182+
return v.(otelmetric.MeasurementOption)
183+
}
184+
attrs := otelmetric.WithAttributeSet(attribute.NewSet(
185+
attribute.String("domain", domain),
186+
attribute.String("entity", entity),
187+
))
188+
v, _ := e.metricAttrsCache.LoadOrStore(key, attrs)
189+
return v.(otelmetric.MeasurementOption)
190+
}
191+
192+
func newBatchEmitterMetrics(meter otelmetric.Meter) (batchEmitterMetrics, error) {
193+
eventsSent, err := meter.Int64Counter("chip_ingress.events_sent",
194+
otelmetric.WithDescription("Total events successfully sent via PublishBatch"),
195+
otelmetric.WithUnit("{event}"))
196+
if err != nil {
197+
return batchEmitterMetrics{}, err
198+
}
199+
200+
eventsDropped, err := meter.Int64Counter("chip_ingress.events_dropped",
201+
otelmetric.WithDescription("Total events dropped (buffer full or send failure)"),
202+
otelmetric.WithUnit("{event}"))
203+
if err != nil {
204+
return batchEmitterMetrics{}, err
205+
}
206+
207+
return batchEmitterMetrics{
208+
eventsSent: eventsSent,
209+
eventsDropped: eventsDropped,
210+
}, nil
211+
}

0 commit comments

Comments
 (0)