Skip to content

Commit 62c122b

Browse files
otelriver: treat JobSnoozeError as flow control, not a failure (#59)
Today a job that returns JobSnoozeError (the standard way to ask River to re-schedule it) flows through the same path as a real failure: - the span status is set to codes.Error with the snooze error as the description - the river.work_count metric records status:error,snooze:true That means snoozes pollute span error rates in tracing UIs and force every chart that wants a true error rate to defensively exclude snooze:true. Snoozes aren't failures; they're flow control - the job asked to run later and will be retried. This change makes setStatus recognize *rivertype.JobSnoozeError and return status:ok + codes.Ok. The snooze:true span/metric attribute (already set by the existing switch on errors.As) is preserved so snoozes remain queryable as a dimension. river.work_count snooze data points are now emitted as status:ok,snooze:true. It also adds a snooze.duration string attribute to the span (e.g. "5s"), making it easy to see how long the job asked to be deferred without having to parse the error string. The duration is span-only because attaching it to the metric would explode cardinality. Tests cover both the wrapped-error path (JobSnoozeError test) and the batch-result path (WorkBatchResultWithJobSnoozeError test). Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 4291d07 commit 62c122b

3 files changed

Lines changed: 61 additions & 18 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Changed
11+
12+
- Record snoozed jobs with status `ok` instead of `error` in `otelriver` middleware. Add new `snooze.duration` span attribute. [PR #59](https://github.com/riverqueue/rivercontrib/pull/59).
13+
1014
## [0.8.0] - 2026-05-15
1115

1216
### Added

otelriver/middleware.go

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,11 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu
210210
defer func() {
211211
duration := m.durationInPreferredUnit(time.Since(begin))
212212

213+
var (
214+
cancelErr *river.JobCancelError
215+
snoozeErr *river.JobSnoozeError
216+
)
217+
213218
if err != nil {
214219
var batchResult interface { // To be superseded if riverbatch.MultiError is moved to rivertype.
215220
ErrorsByID() map[int64]error
@@ -218,11 +223,6 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu
218223
err = batchResult.ErrorsByID()[job.ID]
219224
}
220225

221-
var (
222-
cancelErr *river.JobCancelError
223-
snoozeErr *river.JobSnoozeError
224-
)
225-
226226
switch {
227227
case errors.As(err, &cancelErr):
228228
attrs = append(attrs, attribute.Bool("cancel", true))
@@ -233,16 +233,17 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu
233233

234234
setStatus(attrs, statusIndex, span, panicked, err)
235235

236-
{
237-
// Add some higher cardinality attributes to spans, but keep them
238-
// out of metrics given it's been traditional wisdom that metric
239-
// attribute sets shouldn't be too large.
240-
attrs := append(attrs,
241-
attribute.Int64("id", job.ID),
242-
attribute.String("created_at", job.CreatedAt.Format(time.RFC3339)),
243-
attribute.String("scheduled_at", job.ScheduledAt.Format(time.RFC3339)),
244-
)
245-
span.SetAttributes(attrs...) // set after finalizing status
236+
// Add some higher cardinality attributes to spans, but keep them
237+
// out of metrics given it's been traditional wisdom that metric
238+
// attribute sets shouldn't be too large.
239+
span.SetAttributes(
240+
attribute.Int64("id", job.ID),
241+
attribute.String("created_at", job.CreatedAt.Format(time.RFC3339)),
242+
attribute.String("scheduled_at", job.ScheduledAt.Format(time.RFC3339)),
243+
)
244+
span.SetAttributes(attrs...) // set after finalizing status
245+
if snoozeErr != nil {
246+
span.SetAttributes(attribute.String("snooze.duration", snoozeErr.Duration.String()))
246247
}
247248

248249
// This allocates a new slice, so make sure to do it as few times as possible.
@@ -316,6 +317,13 @@ func setStatus(attrs []attribute.KeyValue, statusIndex int, span trace.Span, pan
316317
case panicked:
317318
attrs[statusIndex] = attribute.String("status", "panic")
318319
span.SetStatus(codes.Error, "panic")
320+
case errors.Is(err, &river.JobSnoozeError{}):
321+
// Snooze is flow control, not failure: the job will be retried
322+
// later. Record as ok so it doesn't pollute error rates; the
323+
// snooze:true span/metric attribute (set by the caller) keeps
324+
// snoozes queryable as a dimension.
325+
attrs[statusIndex] = attribute.String("status", "ok")
326+
span.SetStatus(codes.Ok, "")
319327
case err != nil:
320328
attrs[statusIndex] = attribute.String("status", "error")
321329
span.SetStatus(codes.Error, err.Error())

otelriver/middleware_test.go

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -387,19 +387,37 @@ func TestMiddleware(t *testing.T) {
387387
middleware, bundle := setup(t)
388388

389389
doInner := func(ctx context.Context) error {
390-
return fmt.Errorf("wrapped job snooze: %w", &rivertype.JobSnoozeError{})
390+
return fmt.Errorf("wrapped job snooze: %w", &rivertype.JobSnoozeError{Duration: 5 * time.Second})
391391
}
392392

393393
err := middleware.Work(ctx, &rivertype.JobRow{
394394
Kind: "no_op",
395395
}, doInner)
396-
require.EqualError(t, err, "wrapped job snooze: JobSnoozeError: 0s")
396+
require.EqualError(t, err, "wrapped job snooze: JobSnoozeError: 5s")
397397

398398
spans := bundle.traceExporter.GetSpans()
399399
require.Len(t, spans, 1)
400400

401401
span := spans[0]
402402
require.True(t, getAttribute(t, span.Attributes, "snooze").AsBool())
403+
// Snooze is flow control, not failure: span status is ok and the
404+
// snooze.duration is recorded as a span-only attribute.
405+
require.Equal(t, "ok", getAttribute(t, span.Attributes, "status").AsString())
406+
require.Equal(t, codes.Ok, span.Status.Code)
407+
require.Equal(t, "5s", getAttribute(t, span.Attributes, "snooze.duration").AsString())
408+
409+
// The metric records status:ok with snooze:true so snoozes remain
410+
// queryable as a metric dimension without counting against the
411+
// error rate.
412+
var (
413+
expectedAttrs = []attribute.KeyValue{
414+
attribute.String("status", "ok"),
415+
attribute.Bool("snooze", true),
416+
}
417+
metrics metricdata.ResourceMetrics
418+
)
419+
require.NoError(t, bundle.metricReader.Collect(ctx, &metrics))
420+
requireSum(t, metrics, "river.work_count", 1, expectedAttrs...)
403421
})
404422

405423
t.Run("WorkBatchResultWithJobError", func(t *testing.T) {
@@ -476,7 +494,7 @@ func TestMiddleware(t *testing.T) {
476494

477495
doInner := func(ctx context.Context) error {
478496
return &fakeBatchError{errorsByID: map[int64]error{
479-
123: &rivertype.JobSnoozeError{},
497+
123: &rivertype.JobSnoozeError{Duration: 7 * time.Second},
480498
}}
481499
}
482500

@@ -488,6 +506,19 @@ func TestMiddleware(t *testing.T) {
488506

489507
span := spans[0]
490508
require.True(t, getAttribute(t, span.Attributes, "snooze").AsBool())
509+
require.Equal(t, "ok", getAttribute(t, span.Attributes, "status").AsString())
510+
require.Equal(t, codes.Ok, span.Status.Code)
511+
require.Equal(t, "7s", getAttribute(t, span.Attributes, "snooze.duration").AsString())
512+
513+
var (
514+
expectedAttrs = []attribute.KeyValue{
515+
attribute.String("status", "ok"),
516+
attribute.Bool("snooze", true),
517+
}
518+
metrics metricdata.ResourceMetrics
519+
)
520+
require.NoError(t, bundle.metricReader.Collect(ctx, &metrics))
521+
requireSum(t, metrics, "river.work_count", 1, expectedAttrs...)
491522
})
492523

493524
t.Run("WorkPanic", func(t *testing.T) {

0 commit comments

Comments
 (0)