Skip to content

Commit a3174e7

Browse files
author
Larry Li
committed
publish two basic metrics from the RPC client into Beholder: RPC latency, RPC error rate
1 parent 3ee91a6 commit a3174e7

6 files changed

Lines changed: 237 additions & 9 deletions

File tree

docs/rpc_observability.md

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
# RPC Observability (Beholder)
2+
3+
RPC client metrics are published to Beholder and surface in Prometheus/Grafana when `RPCClientBase` is constructed with a non-nil `metrics.RPCClientMetrics`.
4+
5+
## Metrics
6+
7+
| Metric | Type | Description |
8+
|--------|------|-------------|
9+
| `rpc_request_latency_ms` | Histogram | RPC call latency in milliseconds (per call) |
10+
| `rpc_request_errors_total` | Counter | Total number of failed RPC requests |
11+
12+
Labels: `env`, `network`, `chain_id`, `rpc_provider`, `call` (e.g. `latest_block`, `latest_finalized_block`).
13+
14+
## Example Prometheus / Grafana Queries
15+
16+
### Latency over time
17+
18+
- **p99 latency by env and chain:**
19+
```promql
20+
histogram_quantile(0.99, sum(rate(rpc_request_latency_ms_bucket[5m])) by (le, env, network, chain_id))
21+
```
22+
- **p50 latency for a given environment:**
23+
```promql
24+
histogram_quantile(0.5, sum(rate(rpc_request_latency_ms_bucket{env="staging"}[5m])) by (le, network, chain_id))
25+
```
26+
27+
### Error rate over time
28+
29+
- **Errors per second by env and chain:**
30+
```promql
31+
sum(rate(rpc_request_errors_total[5m])) by (env, network, chain_id, rpc_provider)
32+
```
33+
- **Error rate for a specific RPC provider:**
34+
```promql
35+
sum(rate(rpc_request_errors_total{rpc_provider="primary"}[5m])) by (env, network, chain_id)
36+
```
37+
38+
### Request rate
39+
40+
- **Requests per second by call type:**
41+
```promql
42+
sum(rate(rpc_request_latency_ms_count[5m])) by (call, env, network)
43+
```
44+
45+
## Enabling metrics
46+
47+
Create `RPCClientMetrics` with `metrics.NewRPCClientMetrics(metrics.RPCClientMetricsConfig{...})` and pass it as the last argument to `multinode.NewRPCClientBase(...)`. The follow-up interface refactor will make it easier for multinode/chain integrations to supply `env`, `network`, `chain_id`, and `rpc_provider`.

metrics/rpc_client.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
// RPC client observability using Beholder.
2+
//
3+
// This file defines rpc_request_latency_ms and rpc_request_errors_total, emitted
4+
// from the RPC client when RPCClientBase is constructed with a non-nil RPCClientMetrics.
5+
// Metrics are queryable in Prometheus/Grafana by env, network, chain_id, and rpc_provider.
6+
//
7+
// Example Prometheus/Grafana queries:
8+
//
9+
// - Latency over time (e.g. p99 by env and chain):
10+
// histogram_quantile(0.99, sum(rate(rpc_request_latency_ms_bucket[5m])) by (le, env, network, chain_id))
11+
//
12+
// - Error rate over time (errors per second by env and chain):
13+
// sum(rate(rpc_request_errors_total[5m])) by (env, network, chain_id, rpc_provider)
14+
//
15+
// - Request rate by call type:
16+
// sum(rate(rpc_request_latency_ms_count[5m])) by (call, env, network)
17+
package metrics
18+
19+
import (
20+
"context"
21+
"fmt"
22+
23+
"github.com/prometheus/client_golang/prometheus"
24+
"github.com/prometheus/client_golang/prometheus/promauto"
25+
"go.opentelemetry.io/otel/attribute"
26+
"go.opentelemetry.io/otel/metric"
27+
28+
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
29+
)
30+
31+
const (
32+
// RPCRequestLatencyMs is the Beholder/Prometheus metric name for RPC call latency in milliseconds.
33+
RPCRequestLatencyMs = "rpc_request_latency_ms"
34+
// RPCRequestErrorsTotal is the Beholder/Prometheus metric name for total RPC call errors.
35+
RPCRequestErrorsTotal = "rpc_request_errors_total"
36+
)
37+
38+
var (
39+
rpcRequestLatencyBuckets = []float64{
40+
5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000, 30000,
41+
}
42+
promRPCRequestLatency = promauto.NewHistogramVec(prometheus.HistogramOpts{
43+
Name: RPCRequestLatencyMs,
44+
Help: "RPC request latency in milliseconds (per call)",
45+
Buckets: rpcRequestLatencyBuckets,
46+
}, []string{"env", "network", "chain_id", "rpc_provider", "call"})
47+
promRPCRequestErrors = promauto.NewCounterVec(prometheus.CounterOpts{
48+
Name: RPCRequestErrorsTotal,
49+
Help: "Total number of failed RPC requests",
50+
}, []string{"env", "network", "chain_id", "rpc_provider", "call"})
51+
)
52+
53+
// RPCClientMetrics records RPC latency and error metrics for observability via Beholder/Prometheus.
54+
// Metrics are queryable by environment, network, chain_id, and rpc_provider in Grafana.
55+
type RPCClientMetrics interface {
56+
// RecordRequest records latency for an RPC call. If err is non-nil, also increments the error counter.
57+
// callName identifies the operation (e.g. "latest_block", "latest_finalized_block").
58+
RecordRequest(ctx context.Context, callName string, latencyMs float64, err error)
59+
}
60+
61+
var _ RPCClientMetrics = (*rpcClientMetrics)(nil)
62+
63+
type rpcClientMetrics struct {
64+
env string
65+
network string
66+
chainID string
67+
rpcProvider string
68+
latency metric.Float64Histogram
69+
errorsTotal metric.Int64Counter
70+
}
71+
72+
// RPCClientMetricsConfig holds labels for RPC client metrics.
73+
// Empty strings are allowed; they will still be emitted as labels for filtering.
74+
type RPCClientMetricsConfig struct {
75+
Env string // e.g. "staging", "production"
76+
Network string // chain/network name
77+
ChainID string // chain ID
78+
RPCProvider string // RPC provider or node name (optional)
79+
}
80+
81+
// NewRPCClientMetrics creates RPC client metrics that publish to Beholder and Prometheus.
82+
// Callers (e.g. chain-specific RPC clients or multinode) should pass env, network, chainID, and optionally rpcProvider
83+
// so metrics can be queried in Grafana by environment, chain/network, and RPC provider.
84+
func NewRPCClientMetrics(cfg RPCClientMetricsConfig) (RPCClientMetrics, error) {
85+
latency, err := beholder.GetMeter().Float64Histogram(RPCRequestLatencyMs)
86+
if err != nil {
87+
return nil, fmt.Errorf("failed to register RPC request latency metric: %w", err)
88+
}
89+
errorsTotal, err := beholder.GetMeter().Int64Counter(RPCRequestErrorsTotal)
90+
if err != nil {
91+
return nil, fmt.Errorf("failed to register RPC request errors metric: %w", err)
92+
}
93+
return &rpcClientMetrics{
94+
env: cfg.Env,
95+
network: cfg.Network,
96+
chainID: cfg.ChainID,
97+
rpcProvider: cfg.RPCProvider,
98+
latency: latency,
99+
errorsTotal: errorsTotal,
100+
}, nil
101+
}
102+
103+
func (m *rpcClientMetrics) RecordRequest(ctx context.Context, callName string, latencyMs float64, err error) {
104+
attrs := metric.WithAttributes(
105+
attribute.String("env", m.env),
106+
attribute.String("network", m.network),
107+
attribute.String("chain_id", m.chainID),
108+
attribute.String("rpc_provider", m.rpcProvider),
109+
attribute.String("call", callName),
110+
)
111+
promRPCRequestLatency.WithLabelValues(m.env, m.network, m.chainID, m.rpcProvider, callName).Observe(latencyMs)
112+
m.latency.Record(ctx, latencyMs, attrs)
113+
if err != nil {
114+
promRPCRequestErrors.WithLabelValues(m.env, m.network, m.chainID, m.rpcProvider, callName).Inc()
115+
m.errorsTotal.Add(ctx, 1, attrs)
116+
}
117+
}
118+
119+
// NoopRPCClientMetrics is a no-op implementation for when metrics are disabled.
120+
type NoopRPCClientMetrics struct{}
121+
122+
func (NoopRPCClientMetrics) RecordRequest(context.Context, string, float64, error) {}
123+
124+
// Ensure NoopRPCClientMetrics implements RPCClientMetrics.
125+
var _ RPCClientMetrics = NoopRPCClientMetrics{}

metrics/rpc_client_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package metrics
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
8+
"github.com/stretchr/testify/require"
9+
)
10+
11+
func TestNewRPCClientMetrics(t *testing.T) {
12+
m, err := NewRPCClientMetrics(RPCClientMetricsConfig{
13+
Env: "staging",
14+
Network: "ethereum",
15+
ChainID: "1",
16+
RPCProvider: "primary",
17+
})
18+
require.NoError(t, err)
19+
require.NotNil(t, m)
20+
21+
ctx := context.Background()
22+
m.RecordRequest(ctx, "latest_block", 100.0, nil)
23+
m.RecordRequest(ctx, "latest_block", 50.0, errors.New("rpc error"))
24+
}
25+
26+
func TestNoopRPCClientMetrics_RecordRequest(t *testing.T) {
27+
var m NoopRPCClientMetrics
28+
ctx := context.Background()
29+
m.RecordRequest(ctx, "latest_block", 100.0, nil)
30+
m.RecordRequest(ctx, "latest_block", 50.0, errors.New("rpc error"))
31+
// Noop should not panic
32+
}

multinode/go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ module github.com/smartcontractkit/chainlink-framework/multinode
22

33
go 1.25.3
44

5+
replace github.com/smartcontractkit/chainlink-framework/metrics => ../metrics
6+
57
require (
68
github.com/jpillora/backoff v1.0.0
79
github.com/pkg/errors v0.9.1

multinode/rpc_client_base.go

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99

1010
"github.com/smartcontractkit/chainlink-common/pkg/logger"
1111
"github.com/smartcontractkit/chainlink-common/pkg/services"
12+
13+
"github.com/smartcontractkit/chainlink-framework/metrics"
1214
)
1315

1416
type RPCClientBaseConfig interface {
@@ -46,12 +48,18 @@ type RPCClientBase[HEAD Head] struct {
4648
highestUserObservations ChainInfo
4749
// most recent chain info observed during current lifecycle
4850
latestChainInfo ChainInfo
51+
52+
// rpcMetrics is optional; when set, RPC latency and errors are reported to Beholder/Prometheus.
53+
rpcMetrics metrics.RPCClientMetrics
4954
}
5055

56+
// NewRPCClientBase creates an RPC client base. rpcMetrics is optional; when non-nil,
57+
// latency and error metrics are emitted to Beholder for observability in Prometheus/Grafana.
5158
func NewRPCClientBase[HEAD Head](
5259
cfg RPCClientBaseConfig, ctxTimeout time.Duration, log logger.Logger,
5360
latestBlock func(ctx context.Context) (HEAD, error),
5461
latestFinalizedBlock func(ctx context.Context) (HEAD, error),
62+
rpcMetrics metrics.RPCClientMetrics,
5563
) *RPCClientBase[HEAD] {
5664
return &RPCClientBase[HEAD]{
5765
cfg: cfg,
@@ -61,6 +69,7 @@ func NewRPCClientBase[HEAD Head](
6169
latestFinalizedBlock: latestFinalizedBlock,
6270
subs: make(map[Subscription]struct{}),
6371
lifeCycleCh: make(chan struct{}),
72+
rpcMetrics: rpcMetrics,
6473
}
6574
}
6675

@@ -151,20 +160,29 @@ func (m *RPCClientBase[HEAD]) SubscribeToFinalizedHeads(ctx context.Context) (<-
151160
return channel, sub, nil
152161
}
153162

163+
const (
164+
callLatestBlock = "latest_block"
165+
callLatestFinalizedBlock = "latest_finalized_block"
166+
)
167+
154168
func (m *RPCClientBase[HEAD]) LatestBlock(ctx context.Context) (HEAD, error) {
155169
// capture lifeCycleCh to ensure we are not updating chainInfo with observations related to previous life cycle
156170
ctx, cancel, lifeCycleCh := m.AcquireQueryCtx(ctx, m.ctxTimeout)
157171
defer cancel()
158172

173+
start := time.Now()
159174
head, err := m.latestBlock(ctx)
175+
latencyMs := float64(time.Since(start).Milliseconds())
176+
if err == nil && !head.IsValid() {
177+
err = errors.New("invalid head")
178+
}
179+
if m.rpcMetrics != nil {
180+
m.rpcMetrics.RecordRequest(ctx, callLatestBlock, latencyMs, err)
181+
}
160182
if err != nil {
161183
return head, err
162184
}
163185

164-
if !head.IsValid() {
165-
return head, errors.New("invalid head")
166-
}
167-
168186
m.OnNewHead(ctx, lifeCycleCh, head)
169187
return head, nil
170188
}
@@ -173,15 +191,19 @@ func (m *RPCClientBase[HEAD]) LatestFinalizedBlock(ctx context.Context) (HEAD, e
173191
ctx, cancel, lifeCycleCh := m.AcquireQueryCtx(ctx, m.ctxTimeout)
174192
defer cancel()
175193

194+
start := time.Now()
176195
head, err := m.latestFinalizedBlock(ctx)
196+
latencyMs := float64(time.Since(start).Milliseconds())
197+
if err == nil && !head.IsValid() {
198+
err = errors.New("invalid head")
199+
}
200+
if m.rpcMetrics != nil {
201+
m.rpcMetrics.RecordRequest(ctx, callLatestFinalizedBlock, latencyMs, err)
202+
}
177203
if err != nil {
178204
return head, err
179205
}
180206

181-
if !head.IsValid() {
182-
return head, errors.New("invalid head")
183-
}
184-
185207
m.OnNewFinalizedHead(ctx, lifeCycleCh, head)
186208
return head, nil
187209
}

multinode/rpc_client_base_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func newTestRPC(t *testing.T) *testRPC {
6767
}
6868

6969
rpc := &testRPC{}
70-
rpc.RPCClientBase = NewRPCClientBase[*testHead](cfg, requestTimeout, lggr, rpc.latestBlock, rpc.latestBlock)
70+
rpc.RPCClientBase = NewRPCClientBase[*testHead](cfg, requestTimeout, lggr, rpc.latestBlock, rpc.latestBlock, nil)
7171
t.Cleanup(rpc.Close)
7272
return rpc
7373
}

0 commit comments

Comments
 (0)