Skip to content

Commit 629ec4c

Browse files
committed
Add support for metrics for time to lock jobs and locked count
Here, build on the proposal in [1] to add a metric hook to River, and start emitting metrics for the time to lock jobs and the number of jobs locked. Especially the first metric is generally very useful for looking for queue degradation due to dead tuples. [1] riverqueue/river#1285
1 parent 0446a98 commit 629ec4c

4 files changed

Lines changed: 114 additions & 3 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Added
11+
12+
- Add `otelriver` hook support for River producer metrics, including `river.job_get_available_duration` and `river.job_get_available_count`.
13+
1014
## [0.10.0] - 2026-06-06
1115

1216
### Added

otelriver/example_middleware_test.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,19 @@ import (
1212
)
1313

1414
func ExampleMiddleware() {
15+
middleware := otelriver.NewMiddleware(nil)
16+
1517
_, err := river.NewClient(riverpgxv5.New(nil), &river.Config{
18+
Hooks: []rivertype.Hook{
19+
// Install the OpenTelemetry middleware as a hook to emit producer
20+
// metrics from this River client.
21+
middleware,
22+
},
1623
Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelWarn, ReplaceAttr: slogutil.NoLevelTime})),
1724
Middleware: []rivertype.Middleware{
1825
// Install the OpenTelemetry middleware to run for all jobs inserted
1926
// or worked by this River client.
20-
otelriver.NewMiddleware(nil),
27+
middleware,
2128
},
2229
TestOnly: true, // suitable only for use in tests; remove for live environments
2330
})

otelriver/middleware.go

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,10 @@ type MiddlewareConfig struct {
5858
TracerProvider trace.TracerProvider
5959
}
6060

61-
// Middleware is a River middleware that emits OpenTelemetry metrics when jobs
62-
// are inserted or worked.
61+
// Middleware is a River middleware and hook that emits OpenTelemetry traces and
62+
// metrics.
6363
type Middleware struct {
64+
river.HookDefaults
6465
river.MiddlewareDefaults
6566

6667
config *MiddlewareConfig
@@ -75,6 +76,8 @@ type middlewareMetrics struct {
7576
insertManyCount metric.Int64Counter
7677
insertManyDuration metric.Float64Gauge
7778
insertManyDurationHistogram metric.Float64Histogram
79+
jobGetAvailableDuration metric.Float64Histogram
80+
jobGetAvailableCount metric.Int64Histogram
7881
messagingClientConsumedMessages metric.Int64Counter
7982
messagingClientOperationDuration metric.Float64Histogram
8083
messagingClientSentMessages metric.Int64Counter
@@ -117,6 +120,8 @@ func NewMiddleware(config *MiddlewareConfig) *Middleware {
117120
insertManyCount: mustInt64Counter(meter, prefix+"insert_many_count", metric.WithDescription("Number of job batches inserted (all jobs are inserted in a batch, but batches may be one job)"), metric.WithUnit("{job_batch}")),
118121
insertManyDuration: mustFloat64Gauge(meter, prefix+"insert_many_duration", metric.WithDescription("Duration of job batch insertion"), metric.WithUnit(durationUnit)),
119122
insertManyDurationHistogram: mustFloat64Histogram(meter, prefix+"insert_many_duration_histogram", metric.WithDescription("Duration of job batch insertion (histogram)"), metric.WithUnit(durationUnit)),
123+
jobGetAvailableDuration: mustFloat64Histogram(meter, prefix+"job_get_available_duration", metric.WithDescription("Duration of successful JobGetAvailable calls"), metric.WithUnit(durationUnit)),
124+
jobGetAvailableCount: mustInt64Histogram(meter, prefix+"job_get_available_count", metric.WithDescription("Number of jobs locked by successful JobGetAvailable calls"), metric.WithUnit("{job}")),
120125
workCount: mustInt64Counter(meter, prefix+"work_count", metric.WithDescription("Number of jobs worked"), metric.WithUnit("{job}")),
121126
workDuration: mustFloat64Gauge(meter, prefix+"work_duration", metric.WithDescription("Duration of job being worked"), metric.WithUnit(durationUnit)),
122127
workDurationHistogram: mustFloat64Histogram(meter, prefix+"work_duration_histogram", metric.WithDescription("Duration of job being worked (histogram)"), metric.WithUnit(durationUnit)),
@@ -213,6 +218,21 @@ func (m *Middleware) InsertMany(ctx context.Context, manyParams []*rivertype.Job
213218
return insertRes, err
214219
}
215220

221+
func (m *Middleware) MetricEmit(ctx context.Context, params *rivertype.HookMetricEmitParams) {
222+
if params == nil {
223+
return
224+
}
225+
226+
switch riverMetric := params.Metric.(type) {
227+
case *rivertype.JobGetAvailableDurationMetric:
228+
m.metrics.jobGetAvailableDuration.Record(ctx, m.durationInPreferredUnit(riverMetric.Duration),
229+
metric.WithAttributes(attribute.String("queue", riverMetric.Queue)))
230+
case *rivertype.JobGetAvailableCountMetric:
231+
m.metrics.jobGetAvailableCount.Record(ctx, int64(riverMetric.Count),
232+
metric.WithAttributes(attribute.String("queue", riverMetric.Queue)))
233+
}
234+
}
235+
216236
func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner func(context.Context) error) error {
217237
spanName := prefix + "work"
218238
if m.config.EnableWorkSpanJobKindSuffix {
@@ -328,6 +348,14 @@ func mustFloat64Histogram(meter metric.Meter, name string, options ...metric.Flo
328348
return metric
329349
}
330350

351+
func mustInt64Histogram(meter metric.Meter, name string, options ...metric.Int64HistogramOption) metric.Int64Histogram {
352+
metric, err := meter.Int64Histogram(name, options...)
353+
if err != nil {
354+
panic(err)
355+
}
356+
return metric
357+
}
358+
331359
func mustInt64Counter(meter metric.Meter, name string, options ...metric.Int64CounterOption) metric.Int64Counter {
332360
metric, err := meter.Int64Counter(name, options...)
333361
if err != nil {

otelriver/middleware_test.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
// Verify interface compliance.
2323
var (
2424
_ rivertype.JobInsertMiddleware = &Middleware{}
25+
_ rivertype.HookMetricEmit = &Middleware{}
2526
_ rivertype.WorkerMiddleware = &Middleware{}
2627
)
2728

@@ -58,6 +59,68 @@ func TestMiddleware(t *testing.T) {
5859
return setupConfig(t, &MiddlewareConfig{})
5960
}
6061

62+
t.Run("MetricJobGetAvailableDuration", func(t *testing.T) {
63+
t.Parallel()
64+
65+
middleware, bundle := setup(t)
66+
67+
middleware.MetricEmit(ctx, &rivertype.HookMetricEmitParams{
68+
Metric: &rivertype.JobGetAvailableDurationMetric{
69+
Duration: 2500 * time.Millisecond,
70+
Queue: "critical",
71+
},
72+
})
73+
74+
var metrics metricdata.ResourceMetrics
75+
require.NoError(t, bundle.metricReader.Collect(ctx, &metrics))
76+
metric, metricData := requireHistogramCount(t, metrics, "river.job_get_available_duration", 1,
77+
attribute.String("queue", "critical"))
78+
require.Equal(t, "s", metric.Unit)
79+
require.InDelta(t, 2.5, metricData.DataPoints[0].Sum, 0.001)
80+
})
81+
82+
t.Run("MetricJobGetAvailableDurationUnitMS", func(t *testing.T) {
83+
t.Parallel()
84+
85+
middleware, bundle := setupConfig(t, &MiddlewareConfig{
86+
DurationUnit: "ms",
87+
})
88+
89+
middleware.MetricEmit(ctx, &rivertype.HookMetricEmitParams{
90+
Metric: &rivertype.JobGetAvailableDurationMetric{
91+
Duration: 2500 * time.Millisecond,
92+
Queue: "critical",
93+
},
94+
})
95+
96+
var metrics metricdata.ResourceMetrics
97+
require.NoError(t, bundle.metricReader.Collect(ctx, &metrics))
98+
metric, metricData := requireHistogramCount(t, metrics, "river.job_get_available_duration", 1,
99+
attribute.String("queue", "critical"))
100+
require.Equal(t, "ms", metric.Unit)
101+
require.InDelta(t, 2500.0, metricData.DataPoints[0].Sum, 0.001)
102+
})
103+
104+
t.Run("MetricJobGetAvailableCount", func(t *testing.T) {
105+
t.Parallel()
106+
107+
middleware, bundle := setup(t)
108+
109+
middleware.MetricEmit(ctx, &rivertype.HookMetricEmitParams{
110+
Metric: &rivertype.JobGetAvailableCountMetric{
111+
Count: 42,
112+
Queue: "critical",
113+
},
114+
})
115+
116+
var metrics metricdata.ResourceMetrics
117+
require.NoError(t, bundle.metricReader.Collect(ctx, &metrics))
118+
metric, metricData := requireInt64HistogramCount(t, metrics, "river.job_get_available_count", 1,
119+
attribute.String("queue", "critical"))
120+
require.Equal(t, "{job}", metric.Unit)
121+
require.EqualValues(t, 42, metricData.DataPoints[0].Sum)
122+
})
123+
61124
t.Run("InsertManySuccess", func(t *testing.T) {
62125
t.Parallel()
63126

@@ -827,6 +890,15 @@ func requireHistogramCount(t *testing.T, metrics metricdata.ResourceMetrics, nam
827890
return metric, metricData
828891
}
829892

893+
func requireInt64HistogramCount(t *testing.T, metrics metricdata.ResourceMetrics, name string, count uint64, attrs ...attribute.KeyValue) (metricdata.Metrics, metricdata.Histogram[int64]) { //nolint:unparam
894+
t.Helper()
895+
896+
metric, metricData := requireMetric[metricdata.Histogram[int64]](t, metrics, name)
897+
require.Equal(t, count, metricData.DataPoints[0].Count)
898+
metricdatatest.AssertHasAttributes(t, metric, attrs...)
899+
return metric, metricData
900+
}
901+
830902
func requireNoMetric(t *testing.T, metrics metricdata.ResourceMetrics, name string) {
831903
t.Helper()
832904

0 commit comments

Comments
 (0)