Skip to content

Commit 20a6fb4

Browse files
echatmancursoragent
andcommitted
otelriver: Add kinds span attribute to insert_many spans
Emit the distinct, sorted job kinds present in each batch as a `kinds` string-slice attribute on `river.insert_many` spans. This surfaces which kinds were inserted in each batch for trace queries without affecting metric cardinality. Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 8d9999e commit 20a6fb4

3 files changed

Lines changed: 44 additions & 1 deletion

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1010
### Added
1111

1212
- Added `unique_skipped_as_duplicate` attributes to otel `insert_many` spans and `insert_count` metric. [PR #58](https://github.com/riverqueue/rivercontrib/pull/58).
13+
- Add `kinds` span attribute to `otelriver` `insert_many` spans listing the distinct job kinds in each batch. [PR #TBD](https://github.com/riverqueue/rivercontrib/pull/TBD).
1314

1415
### Changed
1516

otelriver/middleware.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"cmp"
55
"context"
66
"errors"
7+
"slices"
78
"time"
89

910
"go.opentelemetry.io/otel"
@@ -164,8 +165,18 @@ func (m *Middleware) InsertMany(ctx context.Context, manyParams []*rivertype.Job
164165
}
165166
}
166167

168+
kinds := make([]string, 0, len(manyParams))
169+
for _, p := range manyParams {
170+
kinds = append(kinds, p.Kind)
171+
}
172+
slices.Sort(kinds)
173+
kinds = slices.Compact(kinds)
174+
167175
span.SetAttributes(attrs...) // set after finalizing status
168-
span.SetAttributes(attribute.Int64("unique_skipped_as_duplicate_count", skipped))
176+
span.SetAttributes(
177+
attribute.Int64("unique_skipped_as_duplicate_count", skipped),
178+
attribute.StringSlice("kinds", kinds),
179+
)
169180

170181
// This allocates a new slice, so make sure to do it as few times as possible.
171182
measurementOpt := metric.WithAttributes(attrs...)

otelriver/middleware_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ func TestMiddleware(t *testing.T) {
8383
require.Equal(t, "river.insert_many", span.Name)
8484
require.Equal(t, codes.Ok, span.Status.Code)
8585
require.EqualValues(t, 0, getAttribute(t, span.Attributes, "unique_skipped_as_duplicate_count").AsInt64())
86+
require.Equal(t, []string{"no_op"}, getAttribute(t, span.Attributes, "kinds").AsStringSlice())
8687

8788
var (
8889
expectedAttrs = []attribute.KeyValue{
@@ -130,6 +131,7 @@ func TestMiddleware(t *testing.T) {
130131
require.Equal(t, codes.Error, span.Status.Code)
131132
require.Equal(t, "error from doInner", span.Status.Description)
132133
require.EqualValues(t, 0, getAttribute(t, span.Attributes, "unique_skipped_as_duplicate_count").AsInt64())
134+
require.Equal(t, []string{"no_op"}, getAttribute(t, span.Attributes, "kinds").AsStringSlice())
133135

134136
var (
135137
expectedAttrs = []attribute.KeyValue{
@@ -172,6 +174,7 @@ func TestMiddleware(t *testing.T) {
172174
require.Equal(t, codes.Error, span.Status.Code)
173175
require.Equal(t, "panic", span.Status.Description)
174176
require.EqualValues(t, 0, getAttribute(t, span.Attributes, "unique_skipped_as_duplicate_count").AsInt64())
177+
require.Equal(t, []string{"no_op"}, getAttribute(t, span.Attributes, "kinds").AsStringSlice())
175178

176179
var (
177180
expectedAttrs = []attribute.KeyValue{
@@ -273,6 +276,34 @@ func TestMiddleware(t *testing.T) {
273276
requireSumByAttrs(t, metrics, "river.insert_count", 5)
274277
})
275278

279+
t.Run("InsertManyMixedKinds", func(t *testing.T) {
280+
t.Parallel()
281+
282+
middleware, bundle := setup(t)
283+
284+
doInner := func(ctx context.Context) ([]*rivertype.JobInsertResult, error) {
285+
return []*rivertype.JobInsertResult{
286+
{Job: &rivertype.JobRow{ID: 1}},
287+
{Job: &rivertype.JobRow{ID: 2}},
288+
{Job: &rivertype.JobRow{ID: 3}},
289+
}, nil
290+
}
291+
292+
_, err := middleware.InsertMany(ctx, []*rivertype.JobInsertParams{
293+
{Kind: "email_send"},
294+
{Kind: "notification"},
295+
{Kind: "email_send"},
296+
}, doInner)
297+
require.NoError(t, err)
298+
299+
spans := bundle.traceExporter.GetSpans()
300+
require.Len(t, spans, 1)
301+
// kinds are deduplicated and sorted so identical batches produce
302+
// identical span attributes.
303+
require.Equal(t, []string{"email_send", "notification"},
304+
getAttribute(t, spans[0].Attributes, "kinds").AsStringSlice())
305+
})
306+
276307
t.Run("InsertManyDurationUnitMS", func(t *testing.T) {
277308
t.Parallel()
278309

0 commit comments

Comments
 (0)