Skip to content

Commit 34fe2ad

Browse files
authored
Merge branch 'master' into echatman/skipped-as-duplicate-tracking
2 parents 14c6ccd + 62c122b commit 34fe2ad

3 files changed

Lines changed: 61 additions & 20 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10-
### Added
11-
1210
- Added `unique_skipped_as_duplicate` attributes to otel `insert_many` spans and `insert_count` metric. [PR #58](https://github.com/riverqueue/rivercontrib/pull/58).
1311

12+
### Changed
13+
14+
- Record snoozed jobs with status `ok` instead of `error` in `otelriver` middleware. Add new `snooze.duration` span attribute. [PR #59](https://github.com/riverqueue/rivercontrib/pull/59).
15+
1416
## [0.8.0] - 2026-05-15
1517

1618
### Added

otelriver/middleware.go

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,11 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu
230230
defer func() {
231231
duration := m.durationInPreferredUnit(time.Since(begin))
232232

233+
var (
234+
cancelErr *river.JobCancelError
235+
snoozeErr *river.JobSnoozeError
236+
)
237+
233238
if err != nil {
234239
var batchResult interface { // To be superseded if riverbatch.MultiError is moved to rivertype.
235240
ErrorsByID() map[int64]error
@@ -238,11 +243,6 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu
238243
err = batchResult.ErrorsByID()[job.ID]
239244
}
240245

241-
var (
242-
cancelErr *river.JobCancelError
243-
snoozeErr *river.JobSnoozeError
244-
)
245-
246246
switch {
247247
case errors.As(err, &cancelErr):
248248
attrs = append(attrs, attribute.Bool("cancel", true))
@@ -253,16 +253,17 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu
253253

254254
setStatus(attrs, statusIndex, span, panicked, err)
255255

256-
{
257-
// Add some higher cardinality attributes to spans, but keep them
258-
// out of metrics given it's been traditional wisdom that metric
259-
// attribute sets shouldn't be too large.
260-
attrs := append(attrs,
261-
attribute.Int64("id", job.ID),
262-
attribute.String("created_at", job.CreatedAt.Format(time.RFC3339)),
263-
attribute.String("scheduled_at", job.ScheduledAt.Format(time.RFC3339)),
264-
)
265-
span.SetAttributes(attrs...) // set after finalizing status
256+
// Add some higher cardinality attributes to spans, but keep them
257+
// out of metrics given it's been traditional wisdom that metric
258+
// attribute sets shouldn't be too large.
259+
span.SetAttributes(
260+
attribute.Int64("id", job.ID),
261+
attribute.String("created_at", job.CreatedAt.Format(time.RFC3339)),
262+
attribute.String("scheduled_at", job.ScheduledAt.Format(time.RFC3339)),
263+
)
264+
span.SetAttributes(attrs...) // set after finalizing status
265+
if snoozeErr != nil {
266+
span.SetAttributes(attribute.String("snooze.duration", snoozeErr.Duration.String()))
266267
}
267268

268269
// This allocates a new slice, so make sure to do it as few times as possible.
@@ -336,6 +337,13 @@ func setStatus(attrs []attribute.KeyValue, statusIndex int, span trace.Span, pan
336337
case panicked:
337338
attrs[statusIndex] = attribute.String("status", "panic")
338339
span.SetStatus(codes.Error, "panic")
340+
case errors.Is(err, &river.JobSnoozeError{}):
341+
// Snooze is flow control, not failure: the job will be retried
342+
// later. Record as ok so it doesn't pollute error rates; the
343+
// snooze:true span/metric attribute (set by the caller) keeps
344+
// snoozes queryable as a dimension.
345+
attrs[statusIndex] = attribute.String("status", "ok")
346+
span.SetStatus(codes.Ok, "")
339347
case err != nil:
340348
attrs[statusIndex] = attribute.String("status", "error")
341349
span.SetStatus(codes.Error, err.Error())

otelriver/middleware_test.go

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -469,19 +469,37 @@ func TestMiddleware(t *testing.T) {
469469
middleware, bundle := setup(t)
470470

471471
doInner := func(ctx context.Context) error {
472-
return fmt.Errorf("wrapped job snooze: %w", &rivertype.JobSnoozeError{})
472+
return fmt.Errorf("wrapped job snooze: %w", &rivertype.JobSnoozeError{Duration: 5 * time.Second})
473473
}
474474

475475
err := middleware.Work(ctx, &rivertype.JobRow{
476476
Kind: "no_op",
477477
}, doInner)
478-
require.EqualError(t, err, "wrapped job snooze: JobSnoozeError: 0s")
478+
require.EqualError(t, err, "wrapped job snooze: JobSnoozeError: 5s")
479479

480480
spans := bundle.traceExporter.GetSpans()
481481
require.Len(t, spans, 1)
482482

483483
span := spans[0]
484484
require.True(t, getAttribute(t, span.Attributes, "snooze").AsBool())
485+
// Snooze is flow control, not failure: span status is ok and the
486+
// snooze.duration is recorded as a span-only attribute.
487+
require.Equal(t, "ok", getAttribute(t, span.Attributes, "status").AsString())
488+
require.Equal(t, codes.Ok, span.Status.Code)
489+
require.Equal(t, "5s", getAttribute(t, span.Attributes, "snooze.duration").AsString())
490+
491+
// The metric records status:ok with snooze:true so snoozes remain
492+
// queryable as a metric dimension without counting against the
493+
// error rate.
494+
var (
495+
expectedAttrs = []attribute.KeyValue{
496+
attribute.String("status", "ok"),
497+
attribute.Bool("snooze", true),
498+
}
499+
metrics metricdata.ResourceMetrics
500+
)
501+
require.NoError(t, bundle.metricReader.Collect(ctx, &metrics))
502+
requireSum(t, metrics, "river.work_count", 1, expectedAttrs...)
485503
})
486504

487505
t.Run("WorkBatchResultWithJobError", func(t *testing.T) {
@@ -558,7 +576,7 @@ func TestMiddleware(t *testing.T) {
558576

559577
doInner := func(ctx context.Context) error {
560578
return &fakeBatchError{errorsByID: map[int64]error{
561-
123: &rivertype.JobSnoozeError{},
579+
123: &rivertype.JobSnoozeError{Duration: 7 * time.Second},
562580
}}
563581
}
564582

@@ -570,6 +588,19 @@ func TestMiddleware(t *testing.T) {
570588

571589
span := spans[0]
572590
require.True(t, getAttribute(t, span.Attributes, "snooze").AsBool())
591+
require.Equal(t, "ok", getAttribute(t, span.Attributes, "status").AsString())
592+
require.Equal(t, codes.Ok, span.Status.Code)
593+
require.Equal(t, "7s", getAttribute(t, span.Attributes, "snooze.duration").AsString())
594+
595+
var (
596+
expectedAttrs = []attribute.KeyValue{
597+
attribute.String("status", "ok"),
598+
attribute.Bool("snooze", true),
599+
}
600+
metrics metricdata.ResourceMetrics
601+
)
602+
require.NoError(t, bundle.metricReader.Collect(ctx, &metrics))
603+
requireSum(t, metrics, "river.work_count", 1, expectedAttrs...)
573604
})
574605

575606
t.Run("WorkPanic", func(t *testing.T) {

0 commit comments

Comments
 (0)