Skip to content

Commit 25b57d3

Browse files
committed
Refactor promotel
1 parent 387a991 commit 25b57d3

14 files changed

Lines changed: 2958 additions & 1172 deletions

File tree

core/cmd/shell.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,7 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G
324324
RetirementReportCache: retirementReportCache,
325325
LLOTransmissionReaper: lloReaper,
326326
CapabilitiesRegistry: capabilitiesRegistry,
327+
Registerer: appRegisterer,
327328
})
328329
}
329330

core/scripts/go.mod

Lines changed: 145 additions & 56 deletions
Large diffs are not rendered by default.

core/scripts/go.sum

Lines changed: 461 additions & 123 deletions
Large diffs are not rendered by default.

core/services/chainlink/application.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import (
3333
"github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox"
3434
evmtypes "github.com/smartcontractkit/chainlink-integrations/evm/types"
3535
evmutils "github.com/smartcontractkit/chainlink-integrations/evm/utils"
36-
"github.com/smartcontractkit/chainlink/v2/core/services/promotel"
3736
"github.com/smartcontractkit/chainlink/v2/core/bridges"
3837
"github.com/smartcontractkit/chainlink/v2/core/build"
3938
"github.com/smartcontractkit/chainlink/v2/core/capabilities"
@@ -203,6 +202,7 @@ type ApplicationOpts struct {
203202
NewOracleFactoryFn standardcapabilities.NewOracleFactoryFn
204203
FetcherFunc syncer.FetcherFunc
205204
FetcherFactoryFn compute.FetcherFactory
205+
Registerer prometheus.Registerer
206206
}
207207

208208
type Heartbeat struct {
@@ -280,8 +280,17 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
280280
restrictedHTTPClient := opts.RestrictedHTTPClient
281281
unrestrictedHTTPClient := opts.UnrestrictedHTTPClient
282282

283-
promForwarder := promotel.NewForwarder(globalLogger, prometheus.DefaultGatherer, otel.GetMeterProvider())
284-
srvcs = append(srvcs, promForwarder)
283+
if beholderClient := beholder.GetClient(); beholderClient != nil {
284+
forwarderOpts := promotel.DefaultOptions()
285+
forwarderOpts.Endpoint = beholderClient.Config.OtelExporterGRPCEndpoint
286+
forwarderOpts.TLSInsecure = beholderClient.Config.InsecureConnection
287+
forwarderOpts.AuthHeaders = beholderClient.Config.AuthHeaders
288+
promForwarder, err := promotel.NewForwarderService(prometheus.DefaultGatherer, opts.Registerer, globalLogger, forwarderOpts)
289+
if err != nil {
290+
return nil, fmt.Errorf("could not create prometheus forwarder: %w", err)
291+
}
292+
srvcs = append(srvcs, promForwarder)
293+
}
285294

286295
if opts.CapabilitiesRegistry == nil {
287296
// for tests only, in prod Registry should always be set at this point

core/services/promotel/promotel.go

Lines changed: 23 additions & 156 deletions
Original file line numberDiff line numberDiff line change
@@ -2,187 +2,54 @@ package promotel
22

33
import (
44
"context"
5-
"time"
65

76
"github.com/prometheus/client_golang/prometheus"
8-
"github.com/prometheus/client_golang/prometheus/promauto"
9-
"go.opentelemetry.io/collector/consumer"
10-
"go.opentelemetry.io/collector/pdata/pmetric"
11-
"go.opentelemetry.io/otel/metric"
12-
"go.uber.org/zap"
137

148
"github.com/smartcontractkit/chainlink-common/pkg/logger"
159
"github.com/smartcontractkit/chainlink-common/pkg/services"
16-
"github.com/smartcontractkit/chainlink-common/pkg/timeutil"
1710

1811
promotelcommon "github.com/smartcontractkit/chainlink-common/pkg/promotel"
1912
)
2013

2114
const (
22-
period = 15 * time.Second
23-
heartbeatMetricName = "promotel_heartbeat"
15+
name = "PromOTELForwarder"
2416
)
2517

26-
type Forwarder struct {
18+
type Options = promotelcommon.ForwarderOptions
19+
type ForwarderService struct {
2720
services.StateMachine
28-
lggr logger.Logger
29-
gatherer prometheus.Gatherer
30-
meterProvider metric.MeterProvider
31-
stopCh services.StopChan
32-
done chan struct{}
21+
lggr logger.Logger
22+
forwarder *promotelcommon.Forwarder
3323
}
3424

35-
func NewForwarder(lggr logger.Logger, gatherer prometheus.Gatherer, meterProvider metric.MeterProvider) *Forwarder {
36-
return &Forwarder{
37-
lggr: logger.Named(lggr, "PromOTELForwarder"),
38-
gatherer: gatherer,
39-
meterProvider: meterProvider,
40-
stopCh: make(chan struct{}),
41-
done: make(chan struct{}),
42-
}
43-
}
44-
45-
func (f *Forwarder) HealthReport() map[string]error { return map[string]error{f.Name(): f.Healthy()} }
46-
47-
func (f *Forwarder) Name() string { return f.lggr.Name() }
48-
49-
func (f *Forwarder) Start(context.Context) error {
50-
go reportHertbeatMetric(prometheus.DefaultRegisterer, f.lggr, 1*time.Second)
51-
52-
go f.run()
53-
return nil
54-
}
55-
56-
func (f *Forwarder) run() {
57-
58-
defer close(f.done)
59-
ctx, cancel := f.stopCh.NewCtx()
60-
defer cancel()
61-
ticker := timeutil.NewTicker(func() time.Duration { return period })
62-
defer ticker.Stop()
63-
64-
go reportHertbeatMetric(prometheus.DefaultRegisterer, f.lggr, 1*time.Second)
65-
66-
exporter := startExporter(ctx, f.lggr)
67-
68-
// Fetches metrics from in memory prometheus.Gatherer and converts to OTel format
69-
receiver := f.startMetricReceiver(func(ctx context.Context, md pmetric.Metrics) error {
70-
// todo: remove or make configurable
71-
f.logOtelMetric(md, f.lggr)
72-
73-
// Exports the converted OTel metric
74-
return exporter.Consumer().ConsumeMetrics(ctx, md)
75-
})
76-
// Close the receiver and exporter when the forwarder is closed
77-
defer func() {
78-
if err := receiver.Close(); err != nil {
79-
f.lggr.Error("Failed to close scraper", zap.Error(err))
80-
}
81-
if err := exporter.Close(); err != nil {
82-
f.lggr.Error("Failed to close exporter", zap.Error(err))
83-
}
84-
}()
85-
86-
for {
87-
select {
88-
case <-ctx.Done():
89-
return
90-
case <-ticker.C:
91-
f.forward(ctx)
92-
}
93-
}
94-
}
95-
96-
func (f *Forwarder) forward(ctx context.Context) {
97-
mfs, err := f.gatherer.Gather()
25+
func NewForwarderService(g prometheus.Gatherer, r prometheus.Registerer, lggr logger.Logger, opts Options) (*ForwarderService, error) {
26+
l := logger.Named(lggr, name)
27+
forwarder, err := promotelcommon.NewForwarder(g, r, l, opts)
9828
if err != nil {
99-
f.lggr.Errorw("Failed to gather prometheus metrics", "err", err)
100-
}
101-
for _, mf := range mfs {
102-
for range mf.Metric {
103-
if ctx.Err() != nil {
104-
return
105-
}
106-
107-
//TODO f.meterProvider.Meter()
108-
}
29+
return nil, err
10930
}
31+
return &ForwarderService{
32+
lggr: l,
33+
forwarder: forwarder,
34+
}, nil
11035
}
11136

112-
func (f *Forwarder) Close() error {
113-
close(f.stopCh)
114-
<-f.done
115-
return nil
37+
func (f *ForwarderService) HealthReport() map[string]error {
38+
return map[string]error{f.Name(): f.Healthy()}
11639
}
11740

118-
func reportHertbeatMetric(reg prometheus.Registerer, lggr logger.Logger, interval time.Duration) {
119-
heartbeat := promauto.With(reg).NewCounter(prometheus.CounterOpts{
120-
Name: heartbeatMetricName,
121-
ConstLabels: prometheus.Labels{
122-
"source": "promotel",
123-
},
124-
})
125-
for {
126-
heartbeat.Inc()
127-
lggr.Debugw("Heartbeat promotel")
128-
time.Sleep(interval)
129-
}
130-
}
41+
func (f *ForwarderService) Name() string { return f.lggr.Name() }
13142

132-
func startExporter(ctx context.Context, logger logger.Logger) promotelcommon.MetricExporter {
133-
expConfig, err := promotelcommon.NewExporterConfig(map[string]any{
134-
"endpoint": "localhost:4317",
135-
"tls": map[string]any{
136-
"insecure": true,
137-
},
43+
func (f *ForwarderService) Start(ctx context.Context) error {
44+
return f.StartOnce(name, func() error {
45+
return f.forwarder.Start(ctx)
13846
})
139-
if err != nil {
140-
logger.Fatal("Failed to create exporter config", zap.Error(err))
141-
}
142-
// Sends metrics data in OTLP format to otel-collector endpoint
143-
exporter, err := promotelcommon.NewMetricExporter(expConfig, logger)
144-
if err != nil {
145-
logger.Fatal("Failed to create metric exporter", zap.Error(err))
146-
}
147-
err = exporter.Start(ctx)
148-
if err != nil {
149-
logger.Fatal("Failed to start exporter", zap.Error(err))
150-
}
151-
return exporter
15247
}
15348

154-
func (f *Forwarder) startMetricReceiver(next consumer.ConsumeMetricsFunc) promotelcommon.Runnable {
155-
f.lggr.Info("Starting promotel metric receiver")
156-
config, err := promotelcommon.NewDefaultReceiverConfig()
157-
if err != nil {
158-
f.lggr.Fatal("Failed to create config", zap.Error(err))
159-
}
160-
161-
// Gather metrics via promotel
162-
// MetricReceiver fetches metrics from prometheus.Gatherer, then converts it to OTel format and writes formatted metrics to stdout
163-
receiver, err := promotelcommon.NewMetricReceiver(config, f.gatherer, next, f.lggr)
164-
if err != nil {
165-
f.lggr.Fatal("Failed to create debug metric receiver", zap.Error(err))
166-
}
167-
// Starts the promotel
168-
if err := receiver.Start(context.Background()); err != nil {
169-
f.lggr.Fatal("Failed to start metric receiver", zap.Error(err))
170-
}
171-
return receiver
49+
func (f *ForwarderService) Close() error {
50+
return f.StopOnce(name, f.forwarder.Close)
17251
}
17352

174-
func (f *Forwarder) logOtelMetric(md pmetric.Metrics, logger logger.Logger) {
175-
rms := md.ResourceMetrics()
176-
for i := 0; i < rms.Len(); i++ {
177-
rm := rms.At(i)
178-
ilms := rm.ScopeMetrics()
179-
for j := 0; j < ilms.Len(); j++ {
180-
ilm := ilms.At(j)
181-
metrics := ilm.Metrics()
182-
for k := 0; k < metrics.Len(); k++ {
183-
metric := metrics.At(k)
184-
logger.Debug("Exporting OTel metric ", zap.Any("name", metric.Name()), zap.Any("value", metric.Sum().DataPoints().At(0).DoubleValue()))
185-
}
186-
}
187-
}
53+
func DefaultOptions() Options {
54+
return promotelcommon.DefaultForwarderOptions()
18855
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package promotel
2+
3+
import (
4+
"context"
5+
"strings"
6+
"testing"
7+
"time"
8+
9+
"github.com/prometheus/client_golang/prometheus"
10+
"github.com/prometheus/client_golang/prometheus/promauto"
11+
"github.com/smartcontractkit/wsrpc/logger"
12+
"github.com/stretchr/testify/require"
13+
"go.uber.org/zap"
14+
)
15+
16+
func TestForwarder(t *testing.T) {
17+
var (
18+
g = prometheus.DefaultGatherer
19+
r = prometheus.DefaultRegisterer
20+
lggr, observed = logger.TestObserved(t, zap.DebugLevel)
21+
testMetricName = t.Name() + "_test_counter_metric"
22+
interval = 10 * time.Millisecond
23+
)
24+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
25+
defer cancel()
26+
27+
go reportTestMetrics(ctx, r, testMetricName)
28+
29+
doneCh := make(chan struct{})
30+
go func() {
31+
for {
32+
select {
33+
case <-ctx.Done():
34+
return
35+
default:
36+
for _, l := range observed.All() {
37+
metricName, ok := l.ContextMap()["name"].(string)
38+
if ok && strings.Contains(metricName, testMetricName) {
39+
doneCh <- struct{}{}
40+
}
41+
}
42+
time.Sleep(1 * time.Second)
43+
}
44+
}
45+
}()
46+
47+
forwarder, err := NewForwarderService(g, r, lggr, Options{
48+
Endpoint: "localhost:4317",
49+
TLSInsecure: true,
50+
Interval: interval,
51+
Verbose: true,
52+
})
53+
require.NoError(t, err)
54+
require.NoError(t, forwarder.Start(ctx))
55+
defer forwarder.Close()
56+
57+
select {
58+
case <-ctx.Done():
59+
t.Fatal("Test timed out. Expected metric not found")
60+
case <-doneCh:
61+
t.Log("Found metric.")
62+
}
63+
}
64+
65+
func reportTestMetrics(ctx context.Context, reg prometheus.Registerer, metricName string) {
66+
m := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: metricName})
67+
for {
68+
select {
69+
case <-ctx.Done():
70+
return
71+
default:
72+
m.Inc()
73+
time.Sleep(1 * time.Second)
74+
}
75+
}
76+
}

0 commit comments

Comments
 (0)