Skip to content

Commit ed9f645

Browse files
echatmancursoragent
andcommitted
otelriver: partition river.insert_count by skipped_as_duplicate
Today every job submitted to InsertMany is counted in river.insert_count even if River dropped it via UniqueOpts. That makes the insert metric unable to distinguish "we accepted 100 new jobs" from "we accepted 0 new jobs because all 100 were dedup'd against in-flight work" - both look identical on a chart. This adds a skipped_as_duplicate boolean attribute to river.insert_count data points and a duplicate_skipped_count span attribute on the river.insert_many span: - river.insert_count{skipped_as_duplicate:false} - jobs actually enqueued - river.insert_count{skipped_as_duplicate:true} - jobs dropped by UniqueOpts - span.duplicate_skipped_count - how many of the batch were skipped The sum across both insert_count data points still equals len(manyParams), so existing dashboards that sum the metric without filtering see no change. New dashboards / monitors can now compute a duplicate rate directly from the otel metric rather than needing a separate counter in each service that uses UniqueOpts. Tests cover all-duplicates, no-duplicates, and mixed-batch shapes. Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 4291d07 commit ed9f645

2 files changed

Lines changed: 149 additions & 3 deletions

File tree

otelriver/middleware.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,12 +156,32 @@ func (m *Middleware) InsertMany(ctx context.Context, manyParams []*rivertype.Job
156156
duration := m.durationInPreferredUnit(time.Since(begin))
157157

158158
setStatus(attrs, statusIndex, span, panicked, err)
159+
160+
var skipped int64
161+
for _, r := range insertRes {
162+
if r != nil && r.UniqueSkippedAsDuplicate {
163+
skipped++
164+
}
165+
}
166+
159167
span.SetAttributes(attrs...) // set after finalizing status
168+
span.SetAttributes(attribute.Int64("duplicate_skipped_count", skipped))
160169

161170
// This allocates a new slice, so make sure to do it as few times as possible.
162171
measurementOpt := metric.WithAttributes(attrs...)
163172

164-
m.metrics.insertCount.Add(ctx, int64(len(manyParams)), measurementOpt)
173+
// Partition insert_count by skipped_as_duplicate so the metric
174+
// shows how many of the submitted jobs were dropped by UniqueOpts
175+
// vs. actually enqueued. Sum across both data points still equals
176+
// len(manyParams).
177+
if inserted := int64(len(manyParams)) - skipped; inserted > 0 {
178+
m.metrics.insertCount.Add(ctx, inserted,
179+
metric.WithAttributes(append(attrs, attribute.Bool("skipped_as_duplicate", false))...))
180+
}
181+
if skipped > 0 {
182+
m.metrics.insertCount.Add(ctx, skipped,
183+
metric.WithAttributes(append(attrs, attribute.Bool("skipped_as_duplicate", true))...))
184+
}
165185
m.metrics.insertManyCount.Add(ctx, 1, measurementOpt)
166186
m.metrics.insertManyDuration.Record(ctx, duration, measurementOpt)
167187
m.metrics.insertManyDurationHistogram.Record(ctx, duration, measurementOpt)

otelriver/middleware_test.go

Lines changed: 128 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ func TestMiddleware(t *testing.T) {
8282
require.Equal(t, "ok", getAttribute(t, span.Attributes, "status").AsString())
8383
require.Equal(t, "river.insert_many", span.Name)
8484
require.Equal(t, codes.Ok, span.Status.Code)
85+
require.EqualValues(t, 0, getAttribute(t, span.Attributes, "duplicate_skipped_count").AsInt64())
8586

8687
var (
8788
expectedAttrs = []attribute.KeyValue{
@@ -90,7 +91,13 @@ func TestMiddleware(t *testing.T) {
9091
metrics metricdata.ResourceMetrics
9192
)
9293
require.NoError(t, bundle.metricReader.Collect(ctx, &metrics))
93-
requireSum(t, metrics, "river.insert_count", 1, expectedAttrs...)
94+
requireSumByAttrs(t, metrics, "river.insert_count", 1,
95+
attribute.String("status", "ok"),
96+
attribute.Bool("skipped_as_duplicate", false),
97+
)
98+
requireSumByAttrs(t, metrics, "river.insert_count", 0,
99+
attribute.Bool("skipped_as_duplicate", true),
100+
)
94101
requireSum(t, metrics, "river.insert_many_count", 1, expectedAttrs...)
95102
{
96103
metric, _ := requireGaugeNotEmpty(t, metrics, "river.insert_many_duration", expectedAttrs...)
@@ -122,6 +129,7 @@ func TestMiddleware(t *testing.T) {
122129
require.Equal(t, "river.insert_many", span.Name)
123130
require.Equal(t, codes.Error, span.Status.Code)
124131
require.Equal(t, "error from doInner", span.Status.Description)
132+
require.EqualValues(t, 0, getAttribute(t, span.Attributes, "duplicate_skipped_count").AsInt64())
125133

126134
var (
127135
expectedAttrs = []attribute.KeyValue{
@@ -130,7 +138,13 @@ func TestMiddleware(t *testing.T) {
130138
metrics metricdata.ResourceMetrics
131139
)
132140
require.NoError(t, bundle.metricReader.Collect(ctx, &metrics))
133-
requireSum(t, metrics, "river.insert_count", 1, expectedAttrs...)
141+
requireSumByAttrs(t, metrics, "river.insert_count", 1,
142+
attribute.String("status", "error"),
143+
attribute.Bool("skipped_as_duplicate", false),
144+
)
145+
requireSumByAttrs(t, metrics, "river.insert_count", 0,
146+
attribute.Bool("skipped_as_duplicate", true),
147+
)
134148
requireSum(t, metrics, "river.insert_many_count", 1, expectedAttrs...)
135149
requireGaugeNotEmpty(t, metrics, "river.insert_many_duration", expectedAttrs...)
136150
requireHistogramCount(t, metrics, "river.insert_many_duration_histogram", 1, expectedAttrs...)
@@ -157,6 +171,7 @@ func TestMiddleware(t *testing.T) {
157171
require.Equal(t, "river.insert_many", span.Name)
158172
require.Equal(t, codes.Error, span.Status.Code)
159173
require.Equal(t, "panic", span.Status.Description)
174+
require.EqualValues(t, 0, getAttribute(t, span.Attributes, "duplicate_skipped_count").AsInt64())
160175

161176
var (
162177
expectedAttrs = []attribute.KeyValue{
@@ -191,6 +206,73 @@ func TestMiddleware(t *testing.T) {
191206
require.NoError(t, err)
192207
})
193208

209+
t.Run("InsertManyAllDuplicates", func(t *testing.T) {
210+
t.Parallel()
211+
212+
middleware, bundle := setup(t)
213+
214+
doInner := func(ctx context.Context) ([]*rivertype.JobInsertResult, error) {
215+
return []*rivertype.JobInsertResult{
216+
{Job: &rivertype.JobRow{ID: 1}, UniqueSkippedAsDuplicate: true},
217+
{Job: &rivertype.JobRow{ID: 2}, UniqueSkippedAsDuplicate: true},
218+
}, nil
219+
}
220+
221+
_, err := middleware.InsertMany(ctx, []*rivertype.JobInsertParams{{}, {}}, doInner)
222+
require.NoError(t, err)
223+
224+
spans := bundle.traceExporter.GetSpans()
225+
require.Len(t, spans, 1)
226+
require.EqualValues(t, 2, getAttribute(t, spans[0].Attributes, "duplicate_skipped_count").AsInt64())
227+
228+
var metrics metricdata.ResourceMetrics
229+
require.NoError(t, bundle.metricReader.Collect(ctx, &metrics))
230+
requireSumByAttrs(t, metrics, "river.insert_count", 2,
231+
attribute.String("status", "ok"),
232+
attribute.Bool("skipped_as_duplicate", true),
233+
)
234+
requireSumByAttrs(t, metrics, "river.insert_count", 0,
235+
attribute.Bool("skipped_as_duplicate", false),
236+
)
237+
})
238+
239+
t.Run("InsertManyMixedDuplicates", func(t *testing.T) {
240+
t.Parallel()
241+
242+
middleware, bundle := setup(t)
243+
244+
doInner := func(ctx context.Context) ([]*rivertype.JobInsertResult, error) {
245+
return []*rivertype.JobInsertResult{
246+
{Job: &rivertype.JobRow{ID: 1}, UniqueSkippedAsDuplicate: false},
247+
{Job: &rivertype.JobRow{ID: 2}, UniqueSkippedAsDuplicate: true},
248+
{Job: &rivertype.JobRow{ID: 3}, UniqueSkippedAsDuplicate: false},
249+
{Job: &rivertype.JobRow{ID: 4}, UniqueSkippedAsDuplicate: true},
250+
{Job: &rivertype.JobRow{ID: 5}, UniqueSkippedAsDuplicate: false},
251+
}, nil
252+
}
253+
254+
_, err := middleware.InsertMany(ctx,
255+
[]*rivertype.JobInsertParams{{}, {}, {}, {}, {}}, doInner)
256+
require.NoError(t, err)
257+
258+
spans := bundle.traceExporter.GetSpans()
259+
require.Len(t, spans, 1)
260+
require.EqualValues(t, 2, getAttribute(t, spans[0].Attributes, "duplicate_skipped_count").AsInt64())
261+
262+
var metrics metricdata.ResourceMetrics
263+
require.NoError(t, bundle.metricReader.Collect(ctx, &metrics))
264+
requireSumByAttrs(t, metrics, "river.insert_count", 3,
265+
attribute.String("status", "ok"),
266+
attribute.Bool("skipped_as_duplicate", false),
267+
)
268+
requireSumByAttrs(t, metrics, "river.insert_count", 2,
269+
attribute.String("status", "ok"),
270+
attribute.Bool("skipped_as_duplicate", true),
271+
)
272+
// Sum across both data points should still equal len(manyParams).
273+
requireSumByAttrs(t, metrics, "river.insert_count", 5)
274+
})
275+
194276
t.Run("InsertManyDurationUnitMS", func(t *testing.T) {
195277
t.Parallel()
196278

@@ -706,3 +788,47 @@ func requireSum(t *testing.T, metrics metricdata.ResourceMetrics, name string, v
706788
metricdatatest.AssertHasAttributes(t, metric, attrs...)
707789
return metric, metricData
708790
}
791+
792+
// requireSumByAttrs asserts that the sum of all data points on the named
793+
// metric whose attributes are a superset of the given attrs equals the
794+
// expected value. A missing metric is treated as sum=0 so callers don't
795+
// need to distinguish "no data point" from "data point with value 0".
796+
// Pass no attrs to assert against the metric's grand total.
797+
func requireSumByAttrs(t *testing.T, metrics metricdata.ResourceMetrics, name string, expected int64, attrs ...attribute.KeyValue) {
798+
t.Helper()
799+
800+
_, metricData, ok := getMetric[metricdata.Sum[int64]](t, metrics, name)
801+
var got int64
802+
if ok {
803+
wantSet := attribute.NewSet(attrs...)
804+
for _, dp := range metricData.DataPoints {
805+
if attributeSetContains(dp.Attributes, wantSet) {
806+
got += dp.Value
807+
}
808+
}
809+
}
810+
if got != expected {
811+
t.Fatalf("sum of %s data points matching %v: got %d, want %d; data points: %v",
812+
name, attrs, got, expected, formatDataPoints(metricData.DataPoints))
813+
}
814+
}
815+
816+
func attributeSetContains(got, want attribute.Set) bool {
817+
iter := want.Iter()
818+
for iter.Next() {
819+
kv := iter.Attribute()
820+
gotVal, ok := got.Value(kv.Key)
821+
if !ok || gotVal != kv.Value {
822+
return false
823+
}
824+
}
825+
return true
826+
}
827+
828+
func formatDataPoints(dps []metricdata.DataPoint[int64]) string {
829+
out := make([]string, 0, len(dps))
830+
for _, dp := range dps {
831+
out = append(out, fmt.Sprintf("value=%d attrs=%v", dp.Value, dp.Attributes.ToSlice()))
832+
}
833+
return "[" + fmt.Sprint(out) + "]"
834+
}

0 commit comments

Comments
 (0)