Skip to content

Commit b85e1e6

Browse files
committed
test: add chip ingress otel metric assertions
1 parent 555d35c commit b85e1e6

2 files changed

Lines changed: 226 additions & 0 deletions

File tree

pkg/beholder/chip_ingress_batch_emitter_service_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@ import (
1111
"github.com/stretchr/testify/require"
1212
"go.opentelemetry.io/otel"
1313
"go.opentelemetry.io/otel/attribute"
14+
"go.opentelemetry.io/otel/sdk/instrumentation"
1415
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
1516
"go.opentelemetry.io/otel/sdk/metric/metricdata"
17+
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
1618

1719
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
1820
"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
@@ -466,6 +468,30 @@ func TestChipIngressBatchEmitterService_Metrics(t *testing.T) {
466468
require.NoError(t, emitter.Close())
467469

468470
rm := collectEmitterMetrics(t, reader)
471+
metricdatatest.AssertEqual(t, metricdata.ScopeMetrics{
472+
Scope: instrumentation.Scope{Name: "beholder/chip_ingress_batch_emitter"},
473+
Metrics: []metricdata.Metrics{
474+
{
475+
Name: "chip_ingress.events_sent",
476+
Description: "Total events successfully sent via PublishBatch",
477+
Unit: "{event}",
478+
Data: metricdata.Sum[int64]{
479+
Temporality: metricdata.CumulativeTemporality,
480+
IsMonotonic: true,
481+
DataPoints: []metricdata.DataPoint[int64]{
482+
{
483+
Attributes: attribute.NewSet(
484+
attribute.String("domain", "platform"),
485+
attribute.String("entity", "MetricEvent"),
486+
),
487+
Value: 1,
488+
},
489+
},
490+
},
491+
},
492+
},
493+
}, mustEmitterScopeMetrics(t, rm, "beholder/chip_ingress_batch_emitter"), metricdatatest.IgnoreTimestamp())
494+
469495
metric := mustEmitterMetric(t, rm, "chip_ingress.events_sent")
470496
sum, ok := metric.Data.(metricdata.Sum[int64])
471497
require.True(t, ok)
@@ -506,6 +532,30 @@ func TestChipIngressBatchEmitterService_Metrics(t *testing.T) {
506532
require.NoError(t, emitter.Close())
507533

508534
rm := collectEmitterMetrics(t, reader)
535+
metricdatatest.AssertEqual(t, metricdata.ScopeMetrics{
536+
Scope: instrumentation.Scope{Name: "beholder/chip_ingress_batch_emitter"},
537+
Metrics: []metricdata.Metrics{
538+
{
539+
Name: "chip_ingress.events_dropped",
540+
Description: "Total events dropped (buffer full or send failure)",
541+
Unit: "{event}",
542+
Data: metricdata.Sum[int64]{
543+
Temporality: metricdata.CumulativeTemporality,
544+
IsMonotonic: true,
545+
DataPoints: []metricdata.DataPoint[int64]{
546+
{
547+
Attributes: attribute.NewSet(
548+
attribute.String("domain", "platform"),
549+
attribute.String("entity", "MetricDropEvent"),
550+
),
551+
Value: 1,
552+
},
553+
},
554+
},
555+
},
556+
},
557+
}, mustEmitterScopeMetrics(t, rm, "beholder/chip_ingress_batch_emitter"), metricdatatest.IgnoreTimestamp())
558+
509559
metric := mustEmitterMetric(t, rm, "chip_ingress.events_dropped")
510560
sum, ok := metric.Data.(metricdata.Sum[int64])
511561
require.True(t, ok)
@@ -576,6 +626,17 @@ func mustEmitterMetric(t *testing.T, rm metricdata.ResourceMetrics, name string)
576626
return metricdata.Metrics{}
577627
}
578628

629+
func mustEmitterScopeMetrics(t *testing.T, rm metricdata.ResourceMetrics, name string) metricdata.ScopeMetrics {
630+
t.Helper()
631+
for _, sm := range rm.ScopeMetrics {
632+
if sm.Scope.Name == name {
633+
return sm
634+
}
635+
}
636+
t.Fatalf("scope metrics %q not found", name)
637+
return metricdata.ScopeMetrics{}
638+
}
639+
579640
func mustEmitterInt64SumPoint(t *testing.T, sum metricdata.Sum[int64], k1, v1, k2, v2 string) metricdata.DataPoint[int64] {
580641
t.Helper()
581642
for _, dp := range sum.DataPoints {

pkg/chipingress/batch/client_test.go

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@ import (
1313
"github.com/stretchr/testify/require"
1414
"go.opentelemetry.io/otel"
1515
"go.opentelemetry.io/otel/attribute"
16+
"go.opentelemetry.io/otel/sdk/instrumentation"
1617
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
1718
"go.opentelemetry.io/otel/sdk/metric/metricdata"
19+
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
1820

1921
"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
2022
"github.com/smartcontractkit/chainlink-common/pkg/chipingress/mocks"
@@ -1157,6 +1159,77 @@ func TestBatchClient_Metrics(t *testing.T) {
11571159

11581160
client.Stop()
11591161
rm := collectResourceMetrics(t, reader)
1162+
metricdatatest.AssertEqual(t, metricdata.ScopeMetrics{
1163+
Scope: instrumentation.Scope{Name: "chipingress/batch_client"},
1164+
Metrics: []metricdata.Metrics{
1165+
{
1166+
Name: "chip_ingress.batch.send_requests_total",
1167+
Description: "Total PublishBatch requests sent by batch client",
1168+
Unit: "{request}",
1169+
Data: metricdata.Sum[int64]{
1170+
Temporality: metricdata.CumulativeTemporality,
1171+
IsMonotonic: true,
1172+
DataPoints: []metricdata.DataPoint[int64]{
1173+
{Attributes: attribute.NewSet(attribute.String("status", "success"))},
1174+
},
1175+
},
1176+
},
1177+
{
1178+
Name: "chip_ingress.batch.request_size_messages",
1179+
Description: "PublishBatch request size measured in number of events",
1180+
Unit: "{event}",
1181+
Data: metricdata.Histogram[int64]{
1182+
Temporality: metricdata.CumulativeTemporality,
1183+
DataPoints: []metricdata.HistogramDataPoint[int64]{
1184+
{Attributes: attribute.NewSet(attribute.Int("max_batch_size", 1))},
1185+
},
1186+
},
1187+
},
1188+
{
1189+
Name: "chip_ingress.batch.request_size_bytes",
1190+
Description: "PublishBatch request size measured in bytes",
1191+
Unit: "By",
1192+
Data: metricdata.Histogram[int64]{
1193+
Temporality: metricdata.CumulativeTemporality,
1194+
DataPoints: []metricdata.HistogramDataPoint[int64]{
1195+
{Attributes: attribute.NewSet(attribute.Int("max_grpc_request_size_bytes", 2048))},
1196+
},
1197+
},
1198+
},
1199+
{
1200+
Name: "chip_ingress.batch.request_latency_ms",
1201+
Description: "PublishBatch end-to-end latency in milliseconds",
1202+
Unit: "ms",
1203+
Data: metricdata.Histogram[float64]{
1204+
Temporality: metricdata.CumulativeTemporality,
1205+
DataPoints: []metricdata.HistogramDataPoint[float64]{
1206+
{Attributes: attribute.NewSet(attribute.String("status", "success"))},
1207+
},
1208+
},
1209+
},
1210+
{
1211+
Name: "chip_ingress.batch.config.info",
1212+
Description: "Batch client configuration info metric",
1213+
Unit: "{info}",
1214+
Data: metricdata.Gauge[int64]{
1215+
DataPoints: []metricdata.DataPoint[int64]{
1216+
{
1217+
Attributes: attribute.NewSet(
1218+
attribute.Int("max_batch_size", 1),
1219+
attribute.Int("message_buffer_size", 10),
1220+
attribute.Int("max_concurrent_sends", 1),
1221+
attribute.Int64("batch_interval_ms", 1000),
1222+
attribute.Int64("max_publish_timeout_ms", 5000),
1223+
attribute.Int64("shutdown_timeout_ms", 5000),
1224+
attribute.Bool("clone_event", true),
1225+
attribute.Int("max_grpc_request_size_bytes", 2048),
1226+
),
1227+
},
1228+
},
1229+
},
1230+
},
1231+
},
1232+
}, mustScopeMetrics(t, rm, "chipingress/batch_client"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
11601233

11611234
reqTotal := mustMetric(t, rm, "chip_ingress.batch.send_requests_total")
11621235
reqSum, ok := reqTotal.Data.(metricdata.Sum[int64])
@@ -1222,6 +1295,87 @@ func TestBatchClient_Metrics(t *testing.T) {
12221295

12231296
client.Stop()
12241297
rm := collectResourceMetrics(t, reader)
1298+
metricdatatest.AssertEqual(t, metricdata.ScopeMetrics{
1299+
Scope: instrumentation.Scope{Name: "chipingress/batch_client"},
1300+
Metrics: []metricdata.Metrics{
1301+
{
1302+
Name: "chip_ingress.batch.send_requests_total",
1303+
Description: "Total PublishBatch requests sent by batch client",
1304+
Unit: "{request}",
1305+
Data: metricdata.Sum[int64]{
1306+
Temporality: metricdata.CumulativeTemporality,
1307+
IsMonotonic: true,
1308+
DataPoints: []metricdata.DataPoint[int64]{
1309+
{Attributes: attribute.NewSet(attribute.String("status", "failure"))},
1310+
},
1311+
},
1312+
},
1313+
{
1314+
Name: "chip_ingress.batch.send_failures_total",
1315+
Description: "Total failed PublishBatch requests sent by batch client",
1316+
Unit: "{request}",
1317+
Data: metricdata.Sum[int64]{
1318+
Temporality: metricdata.CumulativeTemporality,
1319+
IsMonotonic: true,
1320+
DataPoints: []metricdata.DataPoint[int64]{{}},
1321+
},
1322+
},
1323+
{
1324+
Name: "chip_ingress.batch.request_size_messages",
1325+
Description: "PublishBatch request size measured in number of events",
1326+
Unit: "{event}",
1327+
Data: metricdata.Histogram[int64]{
1328+
Temporality: metricdata.CumulativeTemporality,
1329+
DataPoints: []metricdata.HistogramDataPoint[int64]{
1330+
{Attributes: attribute.NewSet(attribute.Int("max_batch_size", 1))},
1331+
},
1332+
},
1333+
},
1334+
{
1335+
Name: "chip_ingress.batch.request_size_bytes",
1336+
Description: "PublishBatch request size measured in bytes",
1337+
Unit: "By",
1338+
Data: metricdata.Histogram[int64]{
1339+
Temporality: metricdata.CumulativeTemporality,
1340+
DataPoints: []metricdata.HistogramDataPoint[int64]{
1341+
{Attributes: attribute.NewSet(attribute.Int("max_grpc_request_size_bytes", 16*1024*1024))},
1342+
},
1343+
},
1344+
},
1345+
{
1346+
Name: "chip_ingress.batch.request_latency_ms",
1347+
Description: "PublishBatch end-to-end latency in milliseconds",
1348+
Unit: "ms",
1349+
Data: metricdata.Histogram[float64]{
1350+
Temporality: metricdata.CumulativeTemporality,
1351+
DataPoints: []metricdata.HistogramDataPoint[float64]{
1352+
{Attributes: attribute.NewSet(attribute.String("status", "failure"))},
1353+
},
1354+
},
1355+
},
1356+
{
1357+
Name: "chip_ingress.batch.config.info",
1358+
Description: "Batch client configuration info metric",
1359+
Unit: "{info}",
1360+
Data: metricdata.Gauge[int64]{
1361+
DataPoints: []metricdata.DataPoint[int64]{
1362+
{
1363+
Attributes: attribute.NewSet(
1364+
attribute.Int("max_batch_size", 1),
1365+
attribute.Int("message_buffer_size", 10),
1366+
attribute.Int("max_concurrent_sends", 1),
1367+
attribute.Int64("batch_interval_ms", 100),
1368+
attribute.Int64("max_publish_timeout_ms", 5000),
1369+
attribute.Int64("shutdown_timeout_ms", 5000),
1370+
attribute.Bool("clone_event", true),
1371+
attribute.Int("max_grpc_request_size_bytes", 16*1024*1024),
1372+
),
1373+
},
1374+
},
1375+
},
1376+
},
1377+
},
1378+
}, mustScopeMetrics(t, rm, "chipingress/batch_client"), metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
12251379

12261380
reqTotal := mustMetric(t, rm, "chip_ingress.batch.send_requests_total")
12271381
reqSum, ok := reqTotal.Data.(metricdata.Sum[int64])
@@ -1302,6 +1456,17 @@ func mustMetric(t *testing.T, rm metricdata.ResourceMetrics, name string) metric
13021456
return metricdata.Metrics{}
13031457
}
13041458

1459+
func mustScopeMetrics(t *testing.T, rm metricdata.ResourceMetrics, name string) metricdata.ScopeMetrics {
1460+
t.Helper()
1461+
for _, sm := range rm.ScopeMetrics {
1462+
if sm.Scope.Name == name {
1463+
return sm
1464+
}
1465+
}
1466+
t.Fatalf("scope metrics %q not found", name)
1467+
return metricdata.ScopeMetrics{}
1468+
}
1469+
13051470
func mustInt64SumPointWithAttr(t *testing.T, sum metricdata.Sum[int64], key, want string) metricdata.DataPoint[int64] {
13061471
t.Helper()
13071472
for _, dp := range sum.DataPoints {

0 commit comments

Comments
 (0)