Skip to content

Commit c9fc51b

Browse files
authored
fix: decouple Beholder observation metrics from Prometheus scrape cycle (#21720)
* fix: decouple Beholder observation metrics from Prometheus scrape cycle * Address review comments * Add startOnce * Remove the unnecessary CAS loop * Fix tests & other refactor fixes * Fix * Review fixes
1 parent b405267 commit c9fc51b

3 files changed

Lines changed: 191 additions & 156 deletions

File tree

core/capabilities/ccip/oraclecreator/observation_metrics_collector.go

Lines changed: 77 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,20 @@ package oraclecreator
33
import (
44
"context"
55
"errors"
6-
"math"
76
"strings"
8-
"sync/atomic"
7+
"sync"
8+
"time"
99

1010
"github.com/prometheus/client_golang/prometheus"
1111
prometheus_dto "github.com/prometheus/client_model/go"
1212

1313
"github.com/smartcontractkit/chainlink-common/pkg/logger"
1414
)
1515

16+
// defaultPollingInterval is the interval at which the collector polls counters and publishes
17+
// deltas to Beholder, independent of Prometheus scrapes.
18+
const defaultPollingInterval = 10 * time.Second
19+
1620
// ObservationMetricsPublisher is the interface for publishing observation metrics to external destinations
1721
type ObservationMetricsPublisher interface {
1822
PublishMetric(ctx context.Context, metricName string, value float64, labels map[string]string)
@@ -22,7 +26,9 @@ type ObservationMetricsPublisher interface {
2226
type ObservationMetricsCollector struct {
2327
logger logger.Logger
2428
publisher ObservationMetricsPublisher
25-
cancel context.CancelFunc
29+
stop chan struct{}
30+
startOnce sync.Once
31+
stopOnce sync.Once
2632
constantLabels map[string]string // Prometheus labels (for WrapRegistererWith)
2733
beholderLabels map[string]string // Beholder labels (for metrics publishing)
2834

@@ -41,7 +47,7 @@ func NewObservationMetricsCollector(
4147
collector := &ObservationMetricsCollector{
4248
logger: logger,
4349
publisher: publisher,
44-
cancel: func() {},
50+
stop: make(chan struct{}),
4551
constantLabels: constantLabels,
4652
beholderLabels: beholderLabels,
4753
}
@@ -57,64 +63,86 @@ func (c *ObservationMetricsCollector) CreateWrappedRegisterer(baseRegisterer pro
5763
}
5864
}
5965

60-
// Close stops the collector
66+
// Start launches a background goroutine that polls the wrapped counters on the given interval
67+
// and publishes deltas to Beholder, independent of Prometheus scrapes.
68+
// Call Start after the wrapped registerer has been passed to libocr (i.e. after NewOracle),
69+
// so that the counters are already registered before the first poll fires.
70+
// Safe to call multiple times; only the first call starts the goroutine.
71+
func (c *ObservationMetricsCollector) Start(interval time.Duration) {
72+
if interval <= 0 {
73+
interval = defaultPollingInterval
74+
}
75+
c.startOnce.Do(func() {
76+
go func() {
77+
ticker := time.NewTicker(interval)
78+
defer ticker.Stop()
79+
for {
80+
select {
81+
case <-ticker.C:
82+
c.poll()
83+
case <-c.stop:
84+
return
85+
}
86+
}
87+
}()
88+
})
89+
}
90+
91+
// poll reads the current value of each wrapped counter and publishes any delta to Beholder.
92+
func (c *ObservationMetricsCollector) poll() {
93+
if c.sentObservationsCounter != nil {
94+
c.sentObservationsCounter.readAndPublish()
95+
}
96+
if c.includedObservationsCounter != nil {
97+
c.includedObservationsCounter.readAndPublish()
98+
}
99+
}
100+
101+
// Close stops the background polling goroutine. Safe to call multiple times.
61102
func (c *ObservationMetricsCollector) Close() error {
62-
c.cancel()
103+
c.stopOnce.Do(func() { close(c.stop) })
63104
return nil
64105
}
65106

66107
// wrappedCounter wraps a Prometheus collector (which may be a counter or wrappingCollector)
67108
// to intercept Collect() calls and track value changes
68109
type wrappedCounter struct {
69110
prometheus.Collector
70-
metricName string
71-
labels map[string]string // Beholder labels (for metrics publishing)
72-
publisher ObservationMetricsPublisher
73-
logger logger.Logger
74-
lastValueBits uint64 // stores float64 as bits for atomic operations
111+
metricName string
112+
labels map[string]string // Beholder labels (for metrics publishing)
113+
publisher ObservationMetricsPublisher
114+
logger logger.Logger
115+
lastValue float64
75116
}
76117

77-
// Collect intercepts metric collection to detect counter increments
78-
func (w *wrappedCounter) Collect(ch chan<- prometheus.Metric) {
79-
// Create a channel to intercept metrics
80-
interceptCh := make(chan prometheus.Metric, 10)
81-
82-
// Collect from the underlying collector
83-
go func() {
84-
w.Collector.Collect(interceptCh)
85-
close(interceptCh)
86-
}()
87-
88-
// Forward metrics and track counter value
89-
for m := range interceptCh {
90-
// Try to extract the counter value from the metric
118+
// readAndPublish reads the current counter value and publishes any delta to Beholder.
119+
// Only called sequentially by the background poller goroutine, so lastValue requires
120+
// no synchronisation.
121+
func (w *wrappedCounter) readAndPublish() {
122+
// Buffer sized to the typical max Prometheus series per counter (1 for a plain
123+
// Counter, more for a CounterVec). Sized generously to avoid blocking Collect.
124+
ch := make(chan prometheus.Metric, 16)
125+
w.Collect(ch)
126+
close(ch)
127+
128+
for m := range ch {
91129
var metricValue float64
92-
if err := extractCounterValue(m, &metricValue); err == nil {
93-
// Load the last value atomically
94-
lastBits := atomic.LoadUint64(&w.lastValueBits)
95-
lastValue := math.Float64frombits(lastBits)
96-
97-
if metricValue > lastValue {
98-
delta := metricValue - lastValue
99-
// Store the new value atomically
100-
atomic.StoreUint64(&w.lastValueBits, math.Float64bits(metricValue))
101-
102-
w.logger.Debugw("Observation metric incremented",
103-
"metric", w.metricName,
104-
"value", metricValue,
105-
"delta", delta,
106-
"labels", w.labels,
107-
)
108-
109-
if w.publisher != nil {
110-
// Publish the delta, not the cumulative value
111-
w.publisher.PublishMetric(context.Background(), w.metricName, delta, w.labels)
112-
}
130+
if err := extractCounterValue(m, &metricValue); err != nil {
131+
continue
132+
}
133+
delta := metricValue - w.lastValue
134+
if delta > 0 {
135+
w.lastValue = metricValue
136+
w.logger.Debugw("Observation metric incremented",
137+
"metric", w.metricName,
138+
"value", metricValue,
139+
"delta", delta,
140+
"labels", w.labels,
141+
)
142+
if w.publisher != nil {
143+
w.publisher.PublishMetric(context.Background(), w.metricName, delta, w.labels)
113144
}
114145
}
115-
116-
// Forward the metric to the actual channel
117-
ch <- m
118146
}
119147
}
120148

@@ -136,26 +164,6 @@ func extractCounterValue(m prometheus.Metric, value *float64) error {
136164
return errors.New("metric is not a counter")
137165
}
138166

139-
// Describe implements prometheus.Collector
140-
func (c *ObservationMetricsCollector) Describe(ch chan<- *prometheus.Desc) {
141-
if c.sentObservationsCounter != nil {
142-
c.sentObservationsCounter.Describe(ch)
143-
}
144-
if c.includedObservationsCounter != nil {
145-
c.includedObservationsCounter.Describe(ch)
146-
}
147-
}
148-
149-
// Collect implements prometheus.Collector
150-
func (c *ObservationMetricsCollector) Collect(ch chan<- prometheus.Metric) {
151-
if c.sentObservationsCounter != nil {
152-
c.sentObservationsCounter.Collect(ch)
153-
}
154-
if c.includedObservationsCounter != nil {
155-
c.includedObservationsCounter.Collect(ch)
156-
}
157-
}
158-
159167
// interceptingRegisterer wraps a Prometheus registerer to intercept specific metric registrations
160168
type interceptingRegisterer struct {
161169
base prometheus.Registerer

0 commit comments

Comments
 (0)