Skip to content

Refactor beholder lifecycle and chip ingress batching#1862

Draft
thomaska wants to merge 1 commit intomainfrom
infoplat-3436-chipingress-publishBatch
Draft

Refactor beholder lifecycle and chip ingress batching#1862
thomaska wants to merge 1 commit intomainfrom
infoplat-3436-chipingress-publishBatch

Conversation

@thomaska
Copy link
Copy Markdown

@thomaska thomaska commented Feb 27, 2026

Ticket: https://smartcontract-it.atlassian.net/browse/INFOPLAT-3436

Summary

This refactor makes beholder.Client the explicit lifecycle owner for chip-ingress batching and removes constructor-time background startup.

What changed

  • beholder.Client now owns start/stop for the optional batch emitter service
  • ChipIngressBatchEmitterService no longer starts runtime goroutines in the constructor
  • LOOP now starts/stops the beholder client directly instead of using ManagedServices()
  • batching paths no longer retain caller request contexts after enqueue
  • pkg/chipingress/batch.Client keeps simple single-owner Start/Stop semantics

Metrics

Added batch client metrics for:

  • request count / failures
  • batch size in messages
  • batch size in bytes
  • request latency
  • config info gauge

Tests

Added:

  • beholder lifecycle coverage
  • batch/emitter metric assertions
  • benchmark smoke coverage for batch queueing and emitter enqueue paths

Supports

smartcontractkit/chainlink#21327

Copilot AI review requested due to automatic review settings February 27, 2026 14:43
@thomaska thomaska requested a review from a team as a code owner February 27, 2026 14:43
@github-actions
Copy link
Copy Markdown

👋 thomaska, thanks for creating this pull request!

To help reviewers, please consider creating future PRs as drafts first. This allows you to self-review and make any final changes before notifying the team.

Once you're ready, you can mark it as "Ready for review" to request feedback. Thanks!

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request replaces the per-event ChIP Ingress emission with a batched approach to reduce overhead from N gRPC calls + N Kafka transactions to 1 call + 1 transaction per flush interval. The implementation introduces a new ChipIngressBatchEmitter that buffers events per (domain, entity) pair and flushes them periodically using PublishBatch.

Changes:

  • Introduced ChipIngressBatchEmitter with per-(domain, entity) worker goroutines for batching events
  • Added chipIngressEmitterWorker to handle batch assembly and sending with configurable timeouts
  • Removed goroutine wrapper from DualSourceEmitter.Emit() since batching is now non-blocking (channel send)
  • Added 4 new configuration parameters with sensible defaults (BufferSize: 100, MaxBatchSize: 50, SendInterval: 500ms, SendTimeout: 10s)

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated no comments.

Show a summary per file
File Description
pkg/beholder/chip_ingress_batch_emitter.go New batch emitter with per-worker buffering and periodic flushing via PublishBatch
pkg/beholder/chip_ingress_emitter_worker.go Worker implementation handling batch assembly, channel draining, and exponential backoff logging for drops
pkg/beholder/chip_ingress_batch_emitter_test.go Comprehensive test coverage (10 tests) for batching, max batch size, isolation, buffer overflow, lifecycle, errors, and defaults
pkg/beholder/dual_source_emitter.go Simplified Emit() by removing goroutine wrapper since ChipIngressBatchEmitter.Emit() is non-blocking
pkg/beholder/client.go Updated to create and start ChipIngressBatchEmitter instead of ChipIngressEmitter; added comment about closure ordering
pkg/beholder/config.go Added 4 new config fields with inline documentation and default values
pkg/beholder/config_test.go Updated expected output to include new config fields
Comments suppressed due to low confidence (2)

pkg/beholder/config.go:50

  • The comment states "Zero disables batching" but the implementation in NewChipIngressBatchEmitter treats zero as "use default" and sets it to 500ms. The comment should be corrected to match the actual behavior, e.g., "Flush interval per worker (default 500ms when zero or unset)".
	ChipIngressSendInterval time.Duration // Flush interval per worker (default 500ms). Zero disables batching.

pkg/beholder/client.go:248

  • The messageLoggerProvider appears twice in the shutdowner slice. This will cause it to be shut down twice, which could lead to errors or undefined behavior. Remove one of the duplicate entries.
		for _, provider := range []shutdowner{messageLoggerProvider, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider} {

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@github-actions
Copy link
Copy Markdown

github-actions bot commented Feb 27, 2026

⚠️ API Diff Results - github.com/smartcontractkit/chainlink-common

⚠️ Breaking Changes (2)

pkg/beholder (1)
  • NewDualSourceEmitter — Type changed:
func(
  Emitter, 
  Emitter, 
  + bool
)
(Emitter, error)
pkg/beholder.Client (1)
  • Close — 🗑️ Removed

✅ Compatible Changes (26)

pkg/beholder (2)
  • ChipIngressBatchEmitterService — ➕ Added

  • NewChipIngressBatchEmitterService — ➕ Added

pkg/beholder.(*Client) (1)
  • ManagedServices — ➕ Added
pkg/beholder.BeholderClient (1)
  • Service — ➕ Added
pkg/beholder.Client (1)
  • Service — ➕ Added
pkg/beholder.Config (8)
  • ChipIngressBatchEmitterEnabled — ➕ Added

  • ChipIngressBufferSize — ➕ Added

  • ChipIngressDrainTimeout — ➕ Added

  • ChipIngressLogger — ➕ Added

  • ChipIngressMaxBatchSize — ➕ Added

  • ChipIngressMaxConcurrentSends — ➕ Added

  • ChipIngressSendInterval — ➕ Added

  • ChipIngressSendTimeout — ➕ Added

pkg/beholder.writerClientConfig (8)
  • ChipIngressBatchEmitterEnabled — ➕ Added

  • ChipIngressBufferSize — ➕ Added

  • ChipIngressDrainTimeout — ➕ Added

  • ChipIngressLogger — ➕ Added

  • ChipIngressMaxBatchSize — ➕ Added

  • ChipIngressMaxConcurrentSends — ➕ Added

  • ChipIngressSendInterval — ➕ Added

  • ChipIngressSendTimeout — ➕ Added

pkg/loop.EnvConfig (1)
  • ChipIngressBatchEmitterEnabled — ➕ Added
pkg/services.HealthReporter (3)
  • HealthReport — ➕ Added

  • Name — ➕ Added

  • Ready — ➕ Added

pkg/services.Service (1)
  • Start — ➕ Added

📄 View full apidiff report

@smartcontractkit smartcontractkit deleted a comment from github-actions bot Feb 27, 2026
return nil, err
}

chipIngressEmitter, err := NewChipIngressEmitter(chipIngressClient)
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a feature flag

// via chipingress.Client.PublishBatch on a periodic interval.
// It satisfies the Emitter interface so it can be used as a drop-in replacement
// for ChipIngressEmitter.
type ChipIngressBatchEmitter struct {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

name it Service

return e, nil
}

func (e *ChipIngressBatchEmitter) start(_ context.Context) error {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the role of this function if it always returns null?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was mostly added as a placeholder, but can be omitted as well.
And after checking, in the core/services/workflows/syncer/v2/handler.go in EventHandler it's also omitted, so. probably it's more consistent.


// NewChipIngressBatchEmitter creates a batch emitter backed by the given chipingress client.
// Call Start() to begin health monitoring, and Close() to stop all workers.
func NewChipIngressBatchEmitter(client chipingress.Client, lggr logger.Logger, cfg Config) (*ChipIngressBatchEmitter, error) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is pure stylistic, and feel free to ignore it, make the logger the last param and after renaming the struct so ChipIngressBatchService, make sure to adjust the name of the constructor

var events []chipingress.CloudEvent

for len(w.ch) > 0 && len(events) < int(w.maxBatchSize) { // #nosec G115
payload := <-w.ch
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if im not mistaken, this can block if the channel is drained by another goroutine.

i'd use a select instead

max := int(w.maxBatchSize)

for len(events) < max {
	select {
	case payload := <-w.ch:
		event, err := w.payloadToEvent(payload)
		if err != nil {
			w.lggr.Warnf("failed to build CloudEvent, dropping: %v", err)
			continue
		}
		events = append(events, event)
	default:
		return
	}
}


queueErr := e.batchClient.QueueMessage(eventPb, func(sendErr error) {
if sendErr != nil {
e.metrics.eventsDropped.Add(context.Background(), 1, metricAttrs)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use the context passed in from the parameters

ctx, cancel := b.stopCh.CtxWithTimeout(b.shutdownTimeout)
// Use a standalone timeout context so the shutdown wait isn't cancelled
// by close(b.stopCh) below.
ctx, cancel := context.WithTimeout(context.Background(), b.shutdownTimeout)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure I follow why we are doing this?

Copy link
Copy Markdown
Author

@thomaska thomaska Mar 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a potential issue that opus pointed out and it made sense to me.
TL;DR; the timeout will never be respected and the drain didn't have time to run properly, as the same context was being closed right after it was created.
Longer version:
In L121: close(b.stopCh) -- closes the context
which is the same used in L113: ctx, cancel := b.stopCh.CtxWithTimeout(b.shutdownTimeout)
thus the "Done" part in the following select is executed instantaneouly along with the warning message:

		select {
		case <-done:
			// All successfully shutdown
--->		case <-ctx.Done(): // timeout or context cancelled
			b.log.Warnw("timed out waiting for shutdown to finish, force closing", "timeout", b.shutdownTimeout)
		}

Does this make any sense to you?

Comment on lines 27 to 48
@@ -42,6 +43,7 @@ func NewDualSourceEmitter(chipIngressEmitter Emitter, otelCollectorEmitter Emitt
chipIngressEmitter: chipIngressEmitter,
otelCollectorEmitter: otelCollectorEmitter,
log: logger,
nonBlockingEmitter: nonBlockingChipIngress,
stopCh: make(services.StopChan),
}, nil
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should always be non-blocking

Copy link
Copy Markdown
Author

@thomaska thomaska Mar 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The eventual implementation is always non-blocking. The name is not very descriptive?
would something likechipIngressBatchEmitterEnabled be better? This is essentially the feature flag being propagated.

Comment on lines +69 to +85
} else {
// Legacy ChipIngressEmitter.Emit is a synchronous gRPC call;
// fire-and-forget via goroutine to avoid blocking the caller.
if err := d.wg.TryAdd(1); err != nil {
return err
}
go func(ctx context.Context) {
defer d.wg.Done()
var cancel context.CancelFunc
ctx, cancel = d.stopCh.Ctx(ctx)
defer cancel()

if err := d.chipIngressEmitter.Emit(ctx, body, attrKVs...); err != nil {
d.log.Infof("failed to emit to chip ingress: %v", err)
}
}(context.WithoutCancel(ctx))
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we pass in the batch client, can simply just queue the message ?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand :/ Should we remove completely the previous implementation and use the batch client everywhere? If yes should we remove the feature flag as well?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

disregard my comment

envChipIngressInsecureConnection = "CL_CHIP_INGRESS_INSECURE_CONNECTION"
envChipIngressEndpoint = "CL_CHIP_INGRESS_ENDPOINT"
envChipIngressInsecureConnection = "CL_CHIP_INGRESS_INSECURE_CONNECTION"
envChipIngressBatchEmitterEnabled = "CL_CHIP_INGRESS_BATCH_EMITTER_ENABLED"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this should always be enabled

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we were discussing with @pkcll to merge this initially with the flag disabled

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

if sendErr != nil {
e.metrics.eventsDropped.Add(context.Background(), 1, metricAttrs)
} else {
e.metrics.eventsSent.Add(context.Background(), 1, metricAttrs)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

}
})
if queueErr != nil {
e.metrics.eventsDropped.Add(context.Background(), 1, metricAttrs)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

Copy link
Copy Markdown
Contributor

@pkcll pkcll Mar 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add metrics to batch client to observe batching behavior (could be done in separate PR)

  • batch req size (in message) vs max match size
  • batch req size in bytes vs max grpc req size
  • req latency
  • [optional] report batch client configuration as a gauge metric like we do for beholder

pkcll
pkcll previously approved these changes Mar 16, 2026
if err != nil {
return nil, fmt.Errorf("failed to create chip ingress batch emitter: %w", err)
}
if err = batchEmitterService.Start(context.Background()); err != nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to pass the parent component context to batchEmitterService.Start

if err != nil {
return nil, fmt.Errorf("failed to create chip ingress emitter: %w", err)
var chipIngressEmitter Emitter
if cfg.ChipIngressBatchEmitterEnabled {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code needs to be added to both grpc client and http beholder clients

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pkg/beholder/httpclient.go currently has no chip ingress client.
Should we add support for it in this PR?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please create a follow up ticket to sync httpclient with client.go implementation, they have diverged already and there are gaps

}

// NewChipIngressBatchEmitter creates a batch emitter backed by the given chipingress client.
func NewChipIngressBatchEmitter(client chipingress.Client, cfg Config, lggr logger.Logger) (*ChipIngressBatchEmitter, error) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to be able to pass parent context to it to be able to gracefully start and stop.

// and logs will be sent via OTLP using the regular Logger instead of calling Emit
emitter := NewMessageEmitter(messageLogger)

var batchEmitterService *ChipIngressBatchEmitter
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid confusion lets call ChipIngressBatchEmitter something with Service word in it
e.g ChipIngressBatchEmitterService
so that its clear its long running and implement Start/Stop

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pkg/beholder/chip_ingress_batch_emitter.go -> pkg/beholder/chip_ingress_batch_emitter_service.go ?

if err != nil {
return nil, fmt.Errorf("failed to create chip ingress emitter: %w", err)
var chipIngressEmitter Emitter
if cfg.ChipIngressBatchEmitterEnabled {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please create a follow up ticket to sync httpclient with client.go implementation, they have diverged already and there are gaps

gopkg.in/yaml.v2 v2.4.0 // indirect
)

replace github.com/smartcontractkit/chainlink-common/pkg/chipingress => ./pkg/chipingress
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove

Comment on lines +296 to +298
for i := len(s.managedServices) - 1; i >= 0; i-- {
s.Logger.ErrorIfFn(s.managedServices[i].Close, "Failed to close managed service")
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we not closing the beholder client? Let's do that instead, and let it handle these.

Refactor beholder lifecycle so ownership is explicit and centralized around beholder.Client, and move chip-ingress batching runtime startup out of constructors.

Main changes:
- make beholder.Client the top-level lifecycle owner for the optional ChipIngressBatchEmitterService instead of exposing child services to callers
- start the batch emitter from Client.Start(ctx) and shut it down from Client.Close(), keeping provider shutdown and chip client close ordering in one place
- keep ManagedServices() only as a compatibility shim and stop relying on it for LOOP lifecycle management
- update LOOP server startup/shutdown to manage the beholder client directly
- remove constructor-time goroutine startup from the batch emitter service so constructors only wire objects
- make emit-before-start / emit-after-close behavior explicit via service state
- stop retaining caller request contexts past enqueue in async batching paths
- align DualSourceEmitter flag naming with ChipIngressBatchEmitterEnabled to match the actual behavior contract
- fold in the LOOP env config rebase fix so chip ingress and CRE settings are declared exactly once

Chip-ingress batching:
- keep pkg/chipingress/batch.Client as a non-service batching primitive with single-owner Start/Stop semantics
- add batching metrics for request count/failures, batch size in messages, batch size in bytes, request latency, and batch config info
- expose max gRPC request size as a batch client option for metric comparison

Tests and validation:
- add lifecycle coverage for beholder client and managed-services compatibility
- add metric assertions for batch client and batch emitter using OTel metric collection
- add benchmark coverage for batch queueing and emitter enqueue paths
- verify go test ./pkg/loop, ./pkg/beholder, and pkg/chipingress ./batch
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants