Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Add `otelriver` hook support for River producer metrics, including `river.job_get_available_duration` and `river.job_get_available_count`.

## [0.10.0] - 2026-06-06

### Added
Expand Down
9 changes: 8 additions & 1 deletion otelriver/example_middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,19 @@ import (
)

func ExampleMiddleware() {
middleware := otelriver.NewMiddleware(nil)

_, err := river.NewClient(riverpgxv5.New(nil), &river.Config{
Hooks: []rivertype.Hook{
// Install the OpenTelemetry middleware as a hook to emit producer
// metrics from this River client.
middleware,
},
Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelWarn, ReplaceAttr: slogutil.NoLevelTime})),
Middleware: []rivertype.Middleware{
// Install the OpenTelemetry middleware to run for all jobs inserted
// or worked by this River client.
otelriver.NewMiddleware(nil),
middleware,
},
TestOnly: true, // suitable only for use in tests; remove for live environments
})
Expand Down
32 changes: 30 additions & 2 deletions otelriver/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@ type MiddlewareConfig struct {
TracerProvider trace.TracerProvider
}

// Middleware is a River middleware that emits OpenTelemetry metrics when jobs
// are inserted or worked.
// Middleware is a River middleware and hook that emits OpenTelemetry traces and
// metrics.
type Middleware struct {
river.HookDefaults
river.MiddlewareDefaults

config *MiddlewareConfig
Expand All @@ -75,6 +76,8 @@ type middlewareMetrics struct {
insertManyCount metric.Int64Counter
insertManyDuration metric.Float64Gauge
insertManyDurationHistogram metric.Float64Histogram
jobGetAvailableDuration metric.Float64Histogram
jobGetAvailableCount metric.Int64Histogram
messagingClientConsumedMessages metric.Int64Counter
messagingClientOperationDuration metric.Float64Histogram
messagingClientSentMessages metric.Int64Counter
Expand Down Expand Up @@ -117,6 +120,8 @@ func NewMiddleware(config *MiddlewareConfig) *Middleware {
insertManyCount: mustInt64Counter(meter, prefix+"insert_many_count", metric.WithDescription("Number of job batches inserted (all jobs are inserted in a batch, but batches may be one job)"), metric.WithUnit("{job_batch}")),
insertManyDuration: mustFloat64Gauge(meter, prefix+"insert_many_duration", metric.WithDescription("Duration of job batch insertion"), metric.WithUnit(durationUnit)),
insertManyDurationHistogram: mustFloat64Histogram(meter, prefix+"insert_many_duration_histogram", metric.WithDescription("Duration of job batch insertion (histogram)"), metric.WithUnit(durationUnit)),
jobGetAvailableDuration: mustFloat64Histogram(meter, prefix+"job_get_available_duration", metric.WithDescription("Duration of successful JobGetAvailable calls"), metric.WithUnit(durationUnit)),
jobGetAvailableCount: mustInt64Histogram(meter, prefix+"job_get_available_count", metric.WithDescription("Number of jobs locked by successful JobGetAvailable calls"), metric.WithUnit("{job}")),
workCount: mustInt64Counter(meter, prefix+"work_count", metric.WithDescription("Number of jobs worked"), metric.WithUnit("{job}")),
workDuration: mustFloat64Gauge(meter, prefix+"work_duration", metric.WithDescription("Duration of job being worked"), metric.WithUnit(durationUnit)),
workDurationHistogram: mustFloat64Histogram(meter, prefix+"work_duration_histogram", metric.WithDescription("Duration of job being worked (histogram)"), metric.WithUnit(durationUnit)),
Expand Down Expand Up @@ -213,6 +218,21 @@ func (m *Middleware) InsertMany(ctx context.Context, manyParams []*rivertype.Job
return insertRes, err
}

func (m *Middleware) MetricEmit(ctx context.Context, params *rivertype.HookMetricEmitParams) {
if params == nil {
return
}

switch riverMetric := params.Metric.(type) {
case *rivertype.JobGetAvailableDurationMetric:
m.metrics.jobGetAvailableDuration.Record(ctx, m.durationInPreferredUnit(riverMetric.Duration),
metric.WithAttributes(attribute.String("queue", riverMetric.Queue)))
case *rivertype.JobGetAvailableCountMetric:
m.metrics.jobGetAvailableCount.Record(ctx, int64(riverMetric.Count),
metric.WithAttributes(attribute.String("queue", riverMetric.Queue)))
}
}

func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner func(context.Context) error) error {
spanName := prefix + "work"
if m.config.EnableWorkSpanJobKindSuffix {
Expand Down Expand Up @@ -328,6 +348,14 @@ func mustFloat64Histogram(meter metric.Meter, name string, options ...metric.Flo
return metric
}

func mustInt64Histogram(meter metric.Meter, name string, options ...metric.Int64HistogramOption) metric.Int64Histogram {
metric, err := meter.Int64Histogram(name, options...)
if err != nil {
panic(err)
}
return metric
}

func mustInt64Counter(meter metric.Meter, name string, options ...metric.Int64CounterOption) metric.Int64Counter {
metric, err := meter.Int64Counter(name, options...)
if err != nil {
Expand Down
72 changes: 72 additions & 0 deletions otelriver/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
// Verify interface compliance.
var (
_ rivertype.JobInsertMiddleware = &Middleware{}
_ rivertype.HookMetricEmit = &Middleware{}
_ rivertype.WorkerMiddleware = &Middleware{}
)

Expand Down Expand Up @@ -58,6 +59,68 @@ func TestMiddleware(t *testing.T) {
return setupConfig(t, &MiddlewareConfig{})
}

t.Run("MetricJobGetAvailableDuration", func(t *testing.T) {
t.Parallel()

middleware, bundle := setup(t)

middleware.MetricEmit(ctx, &rivertype.HookMetricEmitParams{
Metric: &rivertype.JobGetAvailableDurationMetric{
Duration: 2500 * time.Millisecond,
Queue: "critical",
},
})

var metrics metricdata.ResourceMetrics
require.NoError(t, bundle.metricReader.Collect(ctx, &metrics))
metric, metricData := requireHistogramCount(t, metrics, "river.job_get_available_duration", 1,
attribute.String("queue", "critical"))
require.Equal(t, "s", metric.Unit)
require.InDelta(t, 2.5, metricData.DataPoints[0].Sum, 0.001)
})

t.Run("MetricJobGetAvailableDurationUnitMS", func(t *testing.T) {
t.Parallel()

middleware, bundle := setupConfig(t, &MiddlewareConfig{
DurationUnit: "ms",
})

middleware.MetricEmit(ctx, &rivertype.HookMetricEmitParams{
Metric: &rivertype.JobGetAvailableDurationMetric{
Duration: 2500 * time.Millisecond,
Queue: "critical",
},
})

var metrics metricdata.ResourceMetrics
require.NoError(t, bundle.metricReader.Collect(ctx, &metrics))
metric, metricData := requireHistogramCount(t, metrics, "river.job_get_available_duration", 1,
attribute.String("queue", "critical"))
require.Equal(t, "ms", metric.Unit)
require.InDelta(t, 2500.0, metricData.DataPoints[0].Sum, 0.001)
})

t.Run("MetricJobGetAvailableCount", func(t *testing.T) {
t.Parallel()

middleware, bundle := setup(t)

middleware.MetricEmit(ctx, &rivertype.HookMetricEmitParams{
Metric: &rivertype.JobGetAvailableCountMetric{
Count: 42,
Queue: "critical",
},
})

var metrics metricdata.ResourceMetrics
require.NoError(t, bundle.metricReader.Collect(ctx, &metrics))
metric, metricData := requireInt64HistogramCount(t, metrics, "river.job_get_available_count", 1,
attribute.String("queue", "critical"))
require.Equal(t, "{job}", metric.Unit)
require.EqualValues(t, 42, metricData.DataPoints[0].Sum)
})

t.Run("InsertManySuccess", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -827,6 +890,15 @@ func requireHistogramCount(t *testing.T, metrics metricdata.ResourceMetrics, nam
return metric, metricData
}

func requireInt64HistogramCount(t *testing.T, metrics metricdata.ResourceMetrics, name string, count uint64, attrs ...attribute.KeyValue) (metricdata.Metrics, metricdata.Histogram[int64]) { //nolint:unparam
t.Helper()

metric, metricData := requireMetric[metricdata.Histogram[int64]](t, metrics, name)
require.Equal(t, count, metricData.DataPoints[0].Count)
metricdatatest.AssertHasAttributes(t, metric, attrs...)
return metric, metricData
}

func requireNoMetric(t *testing.T, metrics metricdata.ResourceMetrics, name string) {
t.Helper()

Expand Down
Loading