Skip to content

Commit aed0034

Browse files
authored
OCR3 beholderwrapper for don-time (#21832)
* Initial draft * Fix lint
1 parent eb25fa1 commit aed0034

10 files changed

Lines changed: 689 additions & 123 deletions

File tree

core/cmd/shell.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ import (
5454
"github.com/smartcontractkit/chainlink/v2/core/services/keystore"
5555
"github.com/smartcontractkit/chainlink/v2/core/services/llo"
5656
"github.com/smartcontractkit/chainlink/v2/core/services/llo/retirement"
57+
ocr3beholderwrapper "github.com/smartcontractkit/chainlink/v2/core/services/ocr3/beholderwrapper"
5758
ocr3_1beholderwrapper "github.com/smartcontractkit/chainlink/v2/core/services/ocr3_1/beholderwrapper"
5859
"github.com/smartcontractkit/chainlink/v2/core/services/periodicbackup"
5960
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
@@ -74,7 +75,12 @@ var (
7475
)
7576

7677
func metricViews() []sdkmetric.View {
77-
return slices.Concat(workflowsmonitoring.MetricViews(), ccvcommon.MetricViews(), ocr3_1beholderwrapper.MetricViews())
78+
return slices.Concat(
79+
workflowsmonitoring.MetricViews(),
80+
ccvcommon.MetricViews(),
81+
ocr3beholderwrapper.MetricViews(),
82+
ocr3_1beholderwrapper.MetricViews(),
83+
)
7884
}
7985

8086
func initGlobals(cfgProm config.Prometheus, cfgTracing config.Tracing, cfgTelemetry config.Telemetry, lggr logger.Logger, csaPubKeyHex string, beholderAuthHeaders map[string]string) error {

core/services/ocr2/delegate.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ import (
9292
ringconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ring/config"
9393
vaultocrplugin "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/vault"
9494
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/validate"
95+
ocr3beholderwrapper "github.com/smartcontractkit/chainlink/v2/core/services/ocr3/beholderwrapper"
9596
"github.com/smartcontractkit/chainlink/v2/core/services/ocr3_1/beholderwrapper"
9697
"github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon"
9798
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
@@ -1040,10 +1041,15 @@ func (d *Delegate) newDonTimePlugin(
10401041
OnchainKeyring: onchainKeyringAdapter,
10411042
MetricsRegisterer: prometheus.WrapRegistererWith(map[string]string{"job_name": jb.Name.ValueOrZero()}, prometheus.DefaultRegisterer),
10421043
}
1043-
oracleArgs.ReportingPluginFactory, err = dontime.NewFactory(d.dontimeStore, lggr.Named("DonTimePluginFactory"))
1044+
baseFactory, err := dontime.NewFactory(d.dontimeStore, lggr.Named("DonTimePluginFactory"))
10441045
if err != nil {
10451046
return nil, err
10461047
}
1048+
oracleArgs.ReportingPluginFactory = ocr3beholderwrapper.NewReportingPluginFactory(
1049+
baseFactory,
1050+
lggr,
1051+
"dontime",
1052+
)
10471053

10481054
oracle, err := libocr2.NewOracle(oracleArgs)
10491055
if err != nil {
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package beholderwrapper
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
8+
9+
"github.com/smartcontractkit/chainlink-common/pkg/logger"
10+
11+
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
12+
13+
"github.com/smartcontractkit/chainlink/v2/core/services/ocr3/beholderwrapper/metrics"
14+
)
15+
16+
var _ ocr3types.ReportingPluginFactory[any] = &ReportingPluginFactory[any]{}
17+
18+
type ReportingPluginFactory[RI any] struct {
19+
wrapped ocr3types.ReportingPluginFactory[RI]
20+
lggr logger.Logger
21+
plugin string
22+
}
23+
24+
func NewReportingPluginFactory[RI any](
25+
wrapped ocr3types.ReportingPluginFactory[RI],
26+
lggr logger.Logger,
27+
plugin string,
28+
) *ReportingPluginFactory[RI] {
29+
return &ReportingPluginFactory[RI]{
30+
wrapped: wrapped,
31+
lggr: lggr,
32+
plugin: plugin,
33+
}
34+
}
35+
36+
func (r ReportingPluginFactory[RI]) NewReportingPlugin(ctx context.Context, config ocr3types.ReportingPluginConfig) (ocr3types.ReportingPlugin[RI], ocr3types.ReportingPluginInfo, error) {
37+
plugin, info, err := r.wrapped.NewReportingPlugin(ctx, config)
38+
if err != nil {
39+
return nil, ocr3types.ReportingPluginInfo{}, err
40+
}
41+
42+
m, err := metrics.NewPluginMetrics(MetricPrefix, r.plugin, config.ConfigDigest.String())
43+
if err != nil {
44+
return nil, ocr3types.ReportingPluginInfo{}, fmt.Errorf("failed to create plugin metrics: %w", err)
45+
}
46+
47+
r.lggr.Infow("Wrapping OCR3 ReportingPlugin with beholder metrics reporter",
48+
"configDigest", config.ConfigDigest,
49+
"oracleID", config.OracleID,
50+
)
51+
52+
wrappedPlugin := newReportingPlugin(plugin, m)
53+
return wrappedPlugin, info, nil
54+
}
55+
56+
// MetricViews returns the histogram bucket views for registration with beholder
57+
func MetricViews() []sdkmetric.View {
58+
return metrics.MetricViews(MetricPrefix)
59+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package beholderwrapper
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
8+
"github.com/stretchr/testify/require"
9+
10+
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
11+
12+
"github.com/smartcontractkit/chainlink/v2/core/logger"
13+
)
14+
15+
func Test_WrapperFactory(t *testing.T) {
16+
validFactory := NewReportingPluginFactory[uint](
17+
&fakeFactory[uint]{},
18+
logger.TestLogger(t),
19+
"plugin",
20+
)
21+
failingFactory := NewReportingPluginFactory[uint](
22+
&fakeFactory[uint]{err: errors.New("error")},
23+
logger.TestLogger(t),
24+
"plugin",
25+
)
26+
27+
plugin, _, err := validFactory.NewReportingPlugin(t.Context(), ocr3types.ReportingPluginConfig{})
28+
require.NoError(t, err)
29+
30+
// Verify the wrapped plugin works
31+
_, err = plugin.Outcome(t.Context(), ocr3types.OutcomeContext{}, nil, nil)
32+
require.NoError(t, err)
33+
34+
_, _, err = failingFactory.NewReportingPlugin(t.Context(), ocr3types.ReportingPluginConfig{})
35+
require.Error(t, err)
36+
}
37+
38+
func Test_MetricViews(t *testing.T) {
39+
views := MetricViews()
40+
require.Len(t, views, 2)
41+
}
42+
43+
type fakeFactory[RI any] struct {
44+
err error
45+
}
46+
47+
func (f *fakeFactory[RI]) NewReportingPlugin(context.Context, ocr3types.ReportingPluginConfig) (ocr3types.ReportingPlugin[RI], ocr3types.ReportingPluginInfo, error) {
48+
if f.err != nil {
49+
return nil, ocr3types.ReportingPluginInfo{}, f.err
50+
}
51+
return &fakePlugin[RI]{}, ocr3types.ReportingPluginInfo{}, nil
52+
}
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
package metrics
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strconv"
7+
"time"
8+
9+
"github.com/prometheus/client_golang/prometheus"
10+
"go.opentelemetry.io/otel/attribute"
11+
"go.opentelemetry.io/otel/metric"
12+
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
13+
14+
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
15+
)
16+
17+
// FunctionType represents the OCR plugin function being measured
18+
type FunctionType string
19+
20+
const (
21+
Query FunctionType = "query"
22+
Observation FunctionType = "observation"
23+
ValidateObservation FunctionType = "validateObservation"
24+
// OCR3 specific
25+
Outcome FunctionType = "outcome"
26+
// OCR3.1 specific
27+
ObservationQuorum FunctionType = "observationQuorum"
28+
StateTransition FunctionType = "stateTransition"
29+
Committed FunctionType = "committed"
30+
// Common
31+
Reports FunctionType = "reports"
32+
ShouldAccept FunctionType = "shouldAccept"
33+
ShouldTransmit FunctionType = "shouldTransmit"
34+
)
35+
36+
// PluginMetrics holds OTEL metrics for OCR plugin instrumentation
37+
type PluginMetrics struct {
38+
plugin string
39+
configDigest string
40+
41+
durations metric.Int64Histogram
42+
reportsGenerated metric.Int64Counter
43+
sizes metric.Int64Histogram
44+
status metric.Int64Gauge
45+
}
46+
47+
// NewPluginMetrics creates metrics with the given prefix (e.g., "platform_ocr3_reporting_plugin" or "platform_ocr3_1_reporting_plugin")
48+
func NewPluginMetrics(metricPrefix, plugin, configDigest string) (*PluginMetrics, error) {
49+
durations, err := beholder.GetMeter().Int64Histogram(metricPrefix+"_duration_ms", metric.WithUnit("ms"))
50+
if err != nil {
51+
return nil, fmt.Errorf("failed to create duration histogram: %w", err)
52+
}
53+
54+
reportsGenerated, err := beholder.GetMeter().Int64Counter(metricPrefix+"_reports_processed", metric.WithUnit("1"))
55+
if err != nil {
56+
return nil, fmt.Errorf("failed to create reports counter: %w", err)
57+
}
58+
59+
sizes, err := beholder.GetMeter().Int64Histogram(metricPrefix+"_data_sizes", metric.WithUnit("By"))
60+
if err != nil {
61+
return nil, fmt.Errorf("failed to create sizes histogram: %w", err)
62+
}
63+
64+
status, err := beholder.GetMeter().Int64Gauge(metricPrefix + "_status")
65+
if err != nil {
66+
return nil, fmt.Errorf("failed to create status gauge: %w", err)
67+
}
68+
69+
return &PluginMetrics{
70+
plugin: plugin,
71+
configDigest: configDigest,
72+
durations: durations,
73+
reportsGenerated: reportsGenerated,
74+
sizes: sizes,
75+
status: status,
76+
}, nil
77+
}
78+
79+
// RecordDuration records the duration of a function execution
80+
func (m *PluginMetrics) RecordDuration(ctx context.Context, function FunctionType, d time.Duration, success bool) {
81+
m.durations.Record(ctx, d.Milliseconds(), metric.WithAttributes(
82+
attribute.String("plugin", m.plugin),
83+
attribute.String("function", string(function)),
84+
attribute.String("success", strconv.FormatBool(success)),
85+
attribute.String("configDigest", m.configDigest),
86+
))
87+
}
88+
89+
// TrackReports increments the reports processed counter
90+
func (m *PluginMetrics) TrackReports(ctx context.Context, function FunctionType, count int, success bool) {
91+
m.reportsGenerated.Add(ctx, int64(count), metric.WithAttributes(
92+
attribute.String("plugin", m.plugin),
93+
attribute.String("function", string(function)),
94+
attribute.String("success", strconv.FormatBool(success)),
95+
attribute.String("configDigest", m.configDigest),
96+
))
97+
}
98+
99+
// TrackSize records the size of data produced
100+
func (m *PluginMetrics) TrackSize(ctx context.Context, function FunctionType, size int) {
101+
m.sizes.Record(ctx, int64(size), metric.WithAttributes(
102+
attribute.String("plugin", m.plugin),
103+
attribute.String("function", string(function)),
104+
attribute.String("configDigest", m.configDigest),
105+
))
106+
}
107+
108+
// UpdateStatus updates the plugin status gauge (1 = up, 0 = down)
109+
func (m *PluginMetrics) UpdateStatus(ctx context.Context, up bool) {
110+
val := int64(0)
111+
if up {
112+
val = 1
113+
}
114+
m.status.Record(ctx, val, metric.WithAttributes(
115+
attribute.String("plugin", m.plugin),
116+
attribute.String("configDigest", m.configDigest),
117+
))
118+
}
119+
120+
// MetricViews returns histogram bucket definitions for the given metric prefix.
121+
// Note: due to the OTEL specification, all histogram buckets must be defined when the beholder client is created.
122+
func MetricViews(metricPrefix string) []sdkmetric.View {
123+
return []sdkmetric.View{
124+
sdkmetric.NewView(
125+
sdkmetric.Instrument{Name: metricPrefix + "_duration_ms"},
126+
sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{
127+
// 5, 10, 20, 40, 80, 160, 320, 640, 1280, 2560, 5120, 10240, 20480, 40960
128+
Boundaries: prometheus.ExponentialBuckets(5, 2, 14),
129+
}},
130+
),
131+
sdkmetric.NewView(
132+
sdkmetric.Instrument{Name: metricPrefix + "_data_sizes"},
133+
sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{
134+
// 1KB, 2KB, 4KB, 8KB, 16KB, 32KB, 64KB, 128KB, 256KB, 512KB, 1024KB, 2048KB, 4096KB, 8192KB
135+
Boundaries: prometheus.ExponentialBuckets(1024, 2, 14),
136+
}},
137+
),
138+
}
139+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package metrics
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/stretchr/testify/require"
9+
)
10+
11+
func TestNewPluginMetrics(t *testing.T) {
12+
metrics, err := NewPluginMetrics("platform_ocr3_reporting_plugin", "test-plugin", "abc123")
13+
require.NoError(t, err)
14+
require.NotNil(t, metrics)
15+
}
16+
17+
func TestRecordDuration(t *testing.T) {
18+
metrics, err := NewPluginMetrics("platform_ocr3_reporting_plugin", "test-plugin", "abc123")
19+
require.NoError(t, err)
20+
21+
// Should not panic and should complete without error
22+
metrics.RecordDuration(context.Background(), Query, 100*time.Millisecond, true)
23+
metrics.RecordDuration(context.Background(), Observation, 200*time.Millisecond, false)
24+
metrics.RecordDuration(context.Background(), ValidateObservation, 50*time.Millisecond, true)
25+
metrics.RecordDuration(context.Background(), Outcome, 150*time.Millisecond, true)
26+
metrics.RecordDuration(context.Background(), Reports, 75*time.Millisecond, true)
27+
metrics.RecordDuration(context.Background(), ShouldAccept, 10*time.Millisecond, true)
28+
metrics.RecordDuration(context.Background(), ShouldTransmit, 5*time.Millisecond, false)
29+
}
30+
31+
func TestTrackReports(t *testing.T) {
32+
metrics, err := NewPluginMetrics("platform_ocr3_reporting_plugin", "test-plugin", "abc123")
33+
require.NoError(t, err)
34+
35+
// Should not panic and should complete without error
36+
metrics.TrackReports(context.Background(), Reports, 5, true)
37+
metrics.TrackReports(context.Background(), ShouldAccept, 1, true)
38+
metrics.TrackReports(context.Background(), ShouldTransmit, 0, false)
39+
}
40+
41+
func TestTrackSize(t *testing.T) {
42+
metrics, err := NewPluginMetrics("platform_ocr3_reporting_plugin", "test-plugin", "abc123")
43+
require.NoError(t, err)
44+
45+
// Should not panic and should complete without error
46+
metrics.TrackSize(context.Background(), Observation, 1024)
47+
metrics.TrackSize(context.Background(), Outcome, 2048)
48+
}
49+
50+
func TestUpdateStatus(t *testing.T) {
51+
metrics, err := NewPluginMetrics("platform_ocr3_reporting_plugin", "test-plugin", "abc123")
52+
require.NoError(t, err)
53+
54+
// Should not panic and should complete without error
55+
metrics.UpdateStatus(context.Background(), true)
56+
metrics.UpdateStatus(context.Background(), false)
57+
}
58+
59+
func TestMetricViews(t *testing.T) {
60+
views := MetricViews("platform_ocr3_reporting_plugin")
61+
require.Len(t, views, 2)
62+
}
63+
64+
func TestFunctionTypeConstants(t *testing.T) {
65+
// Verify all expected function types exist
66+
require.Equal(t, Query, FunctionType("query"))
67+
require.Equal(t, Observation, FunctionType("observation"))
68+
require.Equal(t, ValidateObservation, FunctionType("validateObservation"))
69+
require.Equal(t, Outcome, FunctionType("outcome"))
70+
require.Equal(t, ObservationQuorum, FunctionType("observationQuorum"))
71+
require.Equal(t, StateTransition, FunctionType("stateTransition"))
72+
require.Equal(t, Committed, FunctionType("committed"))
73+
require.Equal(t, Reports, FunctionType("reports"))
74+
require.Equal(t, ShouldAccept, FunctionType("shouldAccept"))
75+
require.Equal(t, ShouldTransmit, FunctionType("shouldTransmit"))
76+
}

0 commit comments

Comments
 (0)