Skip to content

Commit 0e5dbb7

Browse files
Add support to emit workflow metrics (#21817)
* add support for user emitted workflow metrics * add common replacement * go mod tidy * use context in EmitUserMetric * mod tidy * remove replace for common * run mod tidy in sub dirs * add changeset * handle unspecified user workflow metric type * fix lint
1 parent aed0034 commit 0e5dbb7

File tree

21 files changed

+228
-65
lines changed

21 files changed

+228
-65
lines changed

.changeset/slim-eels-buzz.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"chainlink": minor
3+
---
4+
5+
#added Add support for user-emitted CRE workflow metrics via WASM host function emit_metric

core/scripts/go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ require (
4747
github.com/smartcontractkit/chain-selectors v1.0.97
4848
github.com/smartcontractkit/chainlink-automation v0.8.1
4949
github.com/smartcontractkit/chainlink-ccip v0.1.1-solana.0.20260317185256-d5f7db87ae70
50-
github.com/smartcontractkit/chainlink-common v0.11.2-0.20260401185621-720567ef9343
50+
github.com/smartcontractkit/chainlink-common v0.11.2-0.20260403093224-b39dab3bfe2a
5151
github.com/smartcontractkit/chainlink-common/keystore v1.0.2
5252
github.com/smartcontractkit/chainlink-data-streams v0.1.13
5353
github.com/smartcontractkit/chainlink-deployments-framework v0.86.3
@@ -508,7 +508,7 @@ require (
508508
github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6 // indirect
509509
github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0 // indirect
510510
github.com/smartcontractkit/chainlink-protos/svr v1.1.1-0.20260203131522-bb8bc5c423b3 // indirect
511-
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260217043601-5cc966896c4f // indirect
511+
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260323124644-faea187e6997 // indirect
512512
github.com/smartcontractkit/chainlink-solana v1.1.2-0.20260331131550-45e89529badc // indirect
513513
github.com/smartcontractkit/chainlink-solana/contracts v0.0.0-20260217175957-8f1af02c5075 // indirect
514514
github.com/smartcontractkit/chainlink-sui v0.0.0-20260401201231-8b06d312c965 // indirect

core/scripts/go.sum

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/services/workflows/events/emit.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,14 @@ func EmitUserLogs(ctx context.Context, labels map[string]string, logLines []*eve
349349
return multiErr
350350
}
351351

352+
func EmitUserMetric(ctx context.Context, labels map[string]string, metric *eventsv2.WorkflowUserMetric) error {
353+
metric.CreInfo = buildCREMetadataV2(labels)
354+
metric.Workflow = buildWorkflowKeyV2(labels)
355+
metric.Timestamp = time.Now().Format(time.RFC3339)
356+
357+
return emitProtoMessage(ctx, metric)
358+
}
359+
352360
// GenerateExecutionID generates a deterministic execution ID from workflowID and triggerEventID
353361
// hash of (workflowID, triggerEventID)
354362
// Deprecated: Use GenerateExecutionIDWithTriggerIndex instead.
@@ -438,6 +446,9 @@ func emitProtoMessage(ctx context.Context, msg proto.Message) error {
438446
case *eventsv2.WorkflowUserLog:
439447
schema = SchemaUserLogsV2
440448
entity = "workflows.v2." + WorkflowUserLog
449+
case *eventsv2.WorkflowUserMetric:
450+
schema = SchemaUserMetricV2
451+
entity = "workflows.v2." + WorkflowUserMetric
441452
case *eventsv2.WorkflowActivated:
442453
schema = SchemaWorkflowActivatedV2
443454
entity = "workflows.v2." + WorkflowActivated

core/services/workflows/events/types.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ const (
2222
TriggerExecutionStarted string = "TriggerExecutionStarted"
2323
// WorkflowUserLog represents user log events
2424
WorkflowUserLog string = "WorkflowUserLog"
25+
// WorkflowUserMetric represents user metric events
26+
WorkflowUserMetric string = "WorkflowUserMetric"
2527
// UserLogs represents user logs events (v1 event type)
2628
UserLogs string = "UserLogs"
2729

@@ -45,6 +47,7 @@ const (
4547
SchemaCapabilityFinishedV2 string = "/cre-events-capability-finished/v2"
4648
SchemaTriggerStartedV2 string = "/cre-events-trigger-started/v2"
4749
SchemaUserLogsV2 string = "/cre-events-user-logs/v2"
50+
SchemaUserMetricV2 string = "/cre-events-user-metric/v2"
4851
SchemaWorkflowActivatedV2 string = "/cre-events-workflow-activated/v2"
4952
SchemaWorkflowPausedV2 string = "/cre-events-workflow-paused/v2"
5053
SchemaWorkflowDeletedV2 string = "/cre-events-workflow-deleted/v2"

core/services/workflows/v2/capability_executor.go

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
sdkpb "github.com/smartcontractkit/chainlink-protos/cre/go/sdk"
1919
"github.com/smartcontractkit/chainlink-protos/cre/go/values"
2020
protoevents "github.com/smartcontractkit/chainlink-protos/workflows/go/events"
21+
eventsv2 "github.com/smartcontractkit/chainlink-protos/workflows/go/v2"
2122

2223
"github.com/smartcontractkit/chainlink/v2/core/platform"
2324
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/events"
@@ -191,17 +192,6 @@ func (c *ExecutionHelper) callCapability(ctx context.Context, request *sdkpb.Cap
191192
},
192193
Config: values.EmptyMap(),
193194
}
194-
gate := c.cfg.LocalLimiters.VaultOrgIDAsSecretOwnerEnabled
195-
if gate == nil {
196-
return nil, errors.New("vault org id gate is nil")
197-
}
198-
enabled, gateErr := gate.Limit(ctx)
199-
if gateErr != nil {
200-
return nil, gateErr
201-
}
202-
if enabled {
203-
capReq.Metadata.OrgID = contexts.CREValue(ctx).Org
204-
}
205195

206196
execLogger.Debug("Executing capability ...")
207197
c.metrics.With(platform.KeyCapabilityID, request.Id).IncrementCapabilityInvocationCounter(ctx)
@@ -274,3 +264,26 @@ func (c *ExecutionHelper) EmitUserLog(msg string) error {
274264
}
275265
return nil
276266
}
267+
268+
const userMetricPrefix = "user_workflow_"
269+
270+
func userMetricTypeSuffix(t eventsv2.UserMetricType) (string, error) {
271+
switch t {
272+
case eventsv2.UserMetricType_USER_METRIC_TYPE_COUNTER:
273+
return "_counter", nil
274+
case eventsv2.UserMetricType_USER_METRIC_TYPE_GAUGE:
275+
return "_gauge", nil
276+
default:
277+
return "", fmt.Errorf("unsupported user metric type: %v", t)
278+
}
279+
}
280+
281+
func (c *ExecutionHelper) EmitUserMetric(ctx context.Context, metric *eventsv2.WorkflowUserMetric) error {
282+
suffix, err := userMetricTypeSuffix(metric.Type)
283+
if err != nil {
284+
return err
285+
}
286+
metric.Name = userMetricPrefix + metric.Name + suffix
287+
loggerLabels := *c.loggerLabels.Load()
288+
return events.EmitUserMetric(ctx, loggerLabels, metric)
289+
}

core/services/workflows/v2/capability_executor_test.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@ package v2
33
import (
44
"testing"
55

6+
"github.com/stretchr/testify/assert"
67
"github.com/stretchr/testify/require"
78

89
"github.com/smartcontractkit/chainlink-common/pkg/settings"
910
"github.com/smartcontractkit/chainlink-common/pkg/settings/cresettings"
1011
"github.com/smartcontractkit/chainlink-common/pkg/settings/limits"
1112
"github.com/smartcontractkit/chainlink-protos/cre/go/sdk"
13+
eventsv2 "github.com/smartcontractkit/chainlink-protos/workflows/go/v2"
1214
"github.com/smartcontractkit/chainlink/v2/core/logger"
1315
)
1416

@@ -51,3 +53,91 @@ func TestExecutionHelper_ConfidentialHTTPPerWorkflowLimit(t *testing.T) {
5153
_, err = exec.CallCapability(t.Context(), req)
5254
require.Error(t, err, "expected CallCapability to fail when per-workflow confidential-http call limit is exceeded")
5355
}
56+
57+
func TestUserMetricTypeSuffix(t *testing.T) {
58+
t.Parallel()
59+
60+
tests := []struct {
61+
name string
62+
metricType eventsv2.UserMetricType
63+
wantSuffix string
64+
wantErr bool
65+
}{
66+
{
67+
name: "counter",
68+
metricType: eventsv2.UserMetricType_USER_METRIC_TYPE_COUNTER,
69+
wantSuffix: "_counter",
70+
},
71+
{
72+
name: "gauge",
73+
metricType: eventsv2.UserMetricType_USER_METRIC_TYPE_GAUGE,
74+
wantSuffix: "_gauge",
75+
},
76+
{
77+
name: "unspecified",
78+
metricType: eventsv2.UserMetricType_USER_METRIC_TYPE_UNSPECIFIED,
79+
wantErr: true,
80+
},
81+
{
82+
name: "unknown numeric value",
83+
metricType: eventsv2.UserMetricType(999),
84+
wantErr: true,
85+
},
86+
}
87+
88+
for _, tc := range tests {
89+
t.Run(tc.name, func(t *testing.T) {
90+
t.Parallel()
91+
suffix, err := userMetricTypeSuffix(tc.metricType)
92+
if tc.wantErr {
93+
require.Error(t, err)
94+
assert.Contains(t, err.Error(), "unsupported user metric type")
95+
} else {
96+
require.NoError(t, err)
97+
assert.Equal(t, tc.wantSuffix, suffix)
98+
}
99+
})
100+
}
101+
}
102+
103+
func TestUserMetricNameFormatting(t *testing.T) {
104+
t.Parallel()
105+
106+
tests := []struct {
107+
name string
108+
metricName string
109+
metricType eventsv2.UserMetricType
110+
wantName string
111+
}{
112+
{
113+
name: "counter metric gets prefix and suffix",
114+
metricName: "price",
115+
metricType: eventsv2.UserMetricType_USER_METRIC_TYPE_COUNTER,
116+
wantName: "user_workflow_price_counter",
117+
},
118+
{
119+
name: "gauge metric gets prefix and suffix",
120+
metricName: "temperature",
121+
metricType: eventsv2.UserMetricType_USER_METRIC_TYPE_GAUGE,
122+
wantName: "user_workflow_temperature_gauge",
123+
},
124+
}
125+
126+
for _, tc := range tests {
127+
t.Run(tc.name, func(t *testing.T) {
128+
t.Parallel()
129+
suffix, err := userMetricTypeSuffix(tc.metricType)
130+
require.NoError(t, err)
131+
got := userMetricPrefix + tc.metricName + suffix
132+
assert.Equal(t, tc.wantName, got)
133+
})
134+
}
135+
}
136+
137+
func TestUserMetricUnsupportedTypeRejected(t *testing.T) {
138+
t.Parallel()
139+
140+
_, err := userMetricTypeSuffix(eventsv2.UserMetricType_USER_METRIC_TYPE_UNSPECIFIED)
141+
require.Error(t, err)
142+
assert.Contains(t, err.Error(), "unsupported user metric type")
143+
}

core/services/workflows/v2/config.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,12 @@ type EngineLimiters struct {
9898
ConfidentialHTTPCalls limits.BoundLimiter[int]
9999
SecretsCalls limits.BoundLimiter[int]
100100

101+
UserMetricEnabled limits.GateLimiter
102+
UserMetricPayload limits.BoundLimiter[config.Size]
103+
UserMetricNameLength limits.BoundLimiter[int]
104+
UserMetricLabelsPerMetric limits.BoundLimiter[int]
105+
UserMetricLabelValueLength limits.BoundLimiter[int]
106+
101107
ExecutionTimestampsEnabled limits.GateLimiter
102108
VaultOrgIDAsSecretOwnerEnabled limits.GateLimiter
103109
}
@@ -193,6 +199,26 @@ func (l *EngineLimiters) init(lf limits.Factory, cfgFn func(*cresettings.Workflo
193199
if err != nil {
194200
return
195201
}
202+
l.UserMetricEnabled, err = limits.MakeGateLimiter(lf, cfg.UserMetricEnabled)
203+
if err != nil {
204+
return
205+
}
206+
l.UserMetricPayload, err = limits.MakeUpperBoundLimiter(lf, cfg.UserMetricPayloadLimit)
207+
if err != nil {
208+
return
209+
}
210+
l.UserMetricNameLength, err = limits.MakeUpperBoundLimiter(lf, cfg.UserMetricNameLengthLimit)
211+
if err != nil {
212+
return
213+
}
214+
l.UserMetricLabelsPerMetric, err = limits.MakeUpperBoundLimiter(lf, cfg.UserMetricLabelsPerMetric)
215+
if err != nil {
216+
return
217+
}
218+
l.UserMetricLabelValueLength, err = limits.MakeUpperBoundLimiter(lf, cfg.UserMetricLabelValueLength)
219+
if err != nil {
220+
return
221+
}
196222
l.ChainAllowed, err = limits.MakeGateLimiter(lf, cfg.ChainAllowed)
197223
if err != nil {
198224
return
@@ -254,6 +280,11 @@ func (l *EngineLimiters) EvictWorkflow(workflowID string) error {
254280
l.CapabilityCallTime,
255281
l.LogEvent,
256282
l.LogLine,
283+
l.UserMetricEnabled,
284+
l.UserMetricPayload,
285+
l.UserMetricNameLength,
286+
l.UserMetricLabelsPerMetric,
287+
l.UserMetricLabelValueLength,
257288
l.ChainAllowed,
258289
l.ChainWriteTargets,
259290
l.ChainReadCalls,
@@ -290,6 +321,11 @@ func (l *EngineLimiters) Close() error {
290321
l.CapabilityCallTime,
291322
l.LogEvent,
292323
l.LogLine,
324+
l.UserMetricEnabled,
325+
l.UserMetricPayload,
326+
l.UserMetricNameLength,
327+
l.UserMetricLabelsPerMetric,
328+
l.UserMetricLabelValueLength,
293329
l.ChainAllowed,
294330
l.ChainWriteTargets,
295331
l.ChainReadCalls,

core/services/workflows/v2/disallowed_execution_helper.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host"
1010
sdkpb "github.com/smartcontractkit/chainlink-protos/cre/go/sdk"
1111
protoevents "github.com/smartcontractkit/chainlink-protos/workflows/go/events"
12+
eventsv2 "github.com/smartcontractkit/chainlink-protos/workflows/go/v2"
1213
)
1314

1415
type disallowedExecutionHelper struct {
@@ -49,3 +50,7 @@ func (d disallowedExecutionHelper) EmitUserLog(msg string) error {
4950
}
5051
return nil
5152
}
53+
54+
func (d disallowedExecutionHelper) EmitUserMetric(_ context.Context, _ *eventsv2.WorkflowUserMetric) error {
55+
return errors.New("metric emission is not allowed during this execution")
56+
}

deployment/go.mod

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ require (
4444
github.com/smartcontractkit/chainlink-ccip/chains/solana v0.0.0-20260224214816-cb23ec38649f
4545
github.com/smartcontractkit/chainlink-ccip/chains/solana/gobindings v0.0.0-20260310183131-8d0f0e383288
4646
github.com/smartcontractkit/chainlink-ccip/deployment v0.0.0-20260317175207-e9ff89561326
47-
github.com/smartcontractkit/chainlink-common v0.11.2-0.20260401185621-720567ef9343
47+
github.com/smartcontractkit/chainlink-common v0.11.2-0.20260403093224-b39dab3bfe2a
4848
github.com/smartcontractkit/chainlink-common/keystore v1.0.2
4949
github.com/smartcontractkit/chainlink-deployments-framework v0.86.3
5050
github.com/smartcontractkit/chainlink-evm v0.3.4-0.20260330133421-5151ea0c3b05
@@ -233,7 +233,7 @@ require (
233233
github.com/felixge/httpsnoop v1.0.4 // indirect
234234
github.com/ferranbt/fastssz v0.1.4 // indirect
235235
github.com/fsnotify/fsnotify v1.9.0 // indirect
236-
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
236+
github.com/fxamacker/cbor/v2 v2.9.0 // indirect
237237
github.com/gabriel-vasile/mimetype v1.4.13 // indirect
238238
github.com/gagliardetto/treeout v0.1.4 // indirect
239239
github.com/gagliardetto/utilz v0.1.3 // indirect
@@ -440,7 +440,7 @@ require (
440440
github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6 // indirect
441441
github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0 // indirect
442442
github.com/smartcontractkit/chainlink-protos/svr v1.1.1-0.20260203131522-bb8bc5c423b3 // indirect
443-
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260217043601-5cc966896c4f // indirect
443+
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260323124644-faea187e6997 // indirect
444444
github.com/smartcontractkit/chainlink-testing-framework/parrot v0.6.2 // indirect
445445
github.com/smartcontractkit/chainlink-testing-framework/seth v1.51.5 // indirect
446446
github.com/smartcontractkit/chainlink-tron/relayer v0.0.11-0.20260218133534-cbd44da2856b // indirect

0 commit comments

Comments
 (0)