diff --git a/core/services/workflows/monitoring/monitoring.go b/core/services/workflows/monitoring/monitoring.go index 45ed778ec3f..2d20c038f0f 100644 --- a/core/services/workflows/monitoring/monitoring.go +++ b/core/services/workflows/monitoring/monitoring.go @@ -47,9 +47,10 @@ type EngineMetrics struct { workflowMissingMeteringReport metric.Int64Counter workflowMeteringMode metric.Int64Gauge - workflowExecutionFailedCounter metric.Int64Counter - workflowExecutionStartedCounter metric.Int64Counter - workflowExecutionSucceededCounter metric.Int64Counter + workflowExecutionFailedCounter metric.Int64Counter + workflowExecutionStartedCounter metric.Int64Counter + workflowExecutionSucceededCounter metric.Int64Counter + workflowExecutionCapabilityFailureCounter metric.Int64Counter getSecretsDuration metric.Int64Histogram @@ -159,6 +160,11 @@ func InitMonitoringResources() (em *EngineMetrics, err error) { return nil, fmt.Errorf("failed to register workflow execution failed counter: %w", err) } + em.workflowExecutionCapabilityFailureCounter, err = beholder.GetMeter().Int64Counter("platform_engine_workflow_execution_failed_capability_error_count") + if err != nil { + return nil, fmt.Errorf("failed to register workflow execution failed capability error counter: %w", err) + } + em.workflowExecutionSucceededCounter, err = beholder.GetMeter().Int64Counter("platform_engine_workflow_execution_succeeded_count") if err != nil { return nil, fmt.Errorf("failed to register workflow execution succeeded counter: %w", err) @@ -471,6 +477,11 @@ func (c WorkflowsMetricLabeler) IncrementWorkflowExecutionFailedCounter(ctx cont c.em.workflowExecutionFailedCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) } +func (c WorkflowsMetricLabeler) IncrementWorkflowExecutionCapabilityFailureCounter(ctx context.Context) { + otelLabels := beholder.OtelAttributes(c.Labels).AsStringAttributes() + c.em.workflowExecutionCapabilityFailureCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) +} + func (c WorkflowsMetricLabeler) IncrementWorkflowExecutionSucceededCounter(ctx context.Context) { otelLabels := beholder.OtelAttributes(c.Labels).AsStringAttributes() c.em.workflowExecutionSucceededCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...)) diff --git a/core/services/workflows/v2/capability_executor.go b/core/services/workflows/v2/capability_executor.go index 244e6bbb72a..be43eefcfd1 100644 --- a/core/services/workflows/v2/capability_executor.go +++ b/core/services/workflows/v2/capability_executor.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "sort" "strconv" "sync" "time" @@ -35,10 +36,11 @@ type ExecutionHelper struct { TimeProvider SecretsFetcher - chainAllowed limits.GateLimiter - callLimiters map[capCall]limits.BoundLimiter[int] - mu sync.Mutex - callCounts map[limits.Limiter[int]]int + chainAllowed limits.GateLimiter + callLimiters map[capCall]limits.BoundLimiter[int] + mu sync.Mutex + callCounts map[limits.Limiter[int]]int + failedCapabilityIDs sync.Map } func (c *ExecutionHelper) initLimiters(limiters *EngineLimiters) { @@ -208,6 +210,7 @@ func (c *ExecutionHelper) callCapability(ctx context.Context, request *sdkpb.Cap executionDuration := time.Since(executionStart) c.metrics.With(platform.KeyCapabilityID, request.Id).UpdateCapabilityExecutionDurationHistogram(ctx, int64(executionDuration.Seconds())) if err != nil { + c.failedCapabilityIDs.Store(request.Id, struct{}{}) var capabilityError caperrors.Error if errors.As(err, &capabilityError) { if capabilityError.Origin() == caperrors.OriginUser { @@ -247,6 +250,16 @@ func (c *ExecutionHelper) callCapability(ctx context.Context, request *sdkpb.Cap }, nil } +func (c *ExecutionHelper) FailedCapabilityIDs() []string { + var ids []string + c.failedCapabilityIDs.Range(func(key, _ any) bool { + ids = append(ids, key.(string)) + return true + }) + sort.Strings(ids) + return ids +} + func (c *ExecutionHelper) GetWorkflowExecutionID() string { return c.WorkflowExecutionID } diff --git a/core/services/workflows/v2/capability_executor_test.go b/core/services/workflows/v2/capability_executor_test.go index bf2094b2873..026736d826d 100644 --- a/core/services/workflows/v2/capability_executor_test.go +++ b/core/services/workflows/v2/capability_executor_test.go @@ -3,6 +3,7 @@ package v2 import ( "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink-common/pkg/settings" @@ -51,3 +52,32 @@ func TestExecutionHelper_ConfidentialHTTPPerWorkflowLimit(t *testing.T) { _, err = exec.CallCapability(t.Context(), req) require.Error(t, err, "expected CallCapability to fail when per-workflow confidential-http call limit is exceeded") } + +func TestExecutionHelper_FailedCapabilityIDs(t *testing.T) { + t.Parallel() + + t.Run("empty when no failures", func(t *testing.T) { + exec := &ExecutionHelper{} + assert.Empty(t, exec.FailedCapabilityIDs()) + }) + + t.Run("records single failed capability", func(t *testing.T) { + exec := &ExecutionHelper{} + exec.failedCapabilityIDs.Store("evm@1.0.0", struct{}{}) + assert.Equal(t, []string{"evm@1.0.0"}, exec.FailedCapabilityIDs()) + }) + + t.Run("deduplicates same capability", func(t *testing.T) { + exec := &ExecutionHelper{} + exec.failedCapabilityIDs.Store("evm@1.0.0", struct{}{}) + exec.failedCapabilityIDs.Store("evm@1.0.0", struct{}{}) + assert.Equal(t, []string{"evm@1.0.0"}, exec.FailedCapabilityIDs()) + }) + + t.Run("records multiple failed capabilities sorted", func(t *testing.T) { + exec := &ExecutionHelper{} + exec.failedCapabilityIDs.Store("evm@1.0.0", struct{}{}) + exec.failedCapabilityIDs.Store("confidential-http@1.0.0", struct{}{}) + assert.Equal(t, []string{"confidential-http@1.0.0", "evm@1.0.0"}, exec.FailedCapabilityIDs()) + }) +} diff --git a/core/services/workflows/v2/engine.go b/core/services/workflows/v2/engine.go index 61be2b98d9c..9b223cf895e 100644 --- a/core/services/workflows/v2/engine.go +++ b/core/services/workflows/v2/engine.go @@ -8,6 +8,7 @@ import ( "runtime" "runtime/debug" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -837,9 +838,13 @@ func (e *Engine) startExecution(ctx context.Context, wrappedTriggerEvent enqueue if len(result.GetError()) > 0 { executionStatus = store.StatusErrored execErr = errors.New(result.GetError()) + failedCapIDs := execHelper.FailedCapabilityIDs() e.metrics.UpdateWorkflowErrorDurationHistogram(ctx, int64(executionDuration.Seconds())) e.metrics.With("workflowID", e.cfg.WorkflowID, "workflowName", e.cfg.WorkflowName.String()).IncrementWorkflowExecutionFailedCounter(ctx) - executionLogger.Errorw("Workflow execution failed", "status", executionStatus, "durationMs", executionDuration.Milliseconds(), "error", result.GetError()) + for _, capID := range failedCapIDs { + e.metrics.With("workflowID", e.cfg.WorkflowID, "workflowName", e.cfg.WorkflowName.String(), platform.KeyCapabilityID, capID).IncrementWorkflowExecutionCapabilityFailureCounter(ctx) + } + executionLogger.Errorw("Workflow execution failed", "status", executionStatus, "durationMs", executionDuration.Milliseconds(), platform.KeyCapabilityID, strings.Join(failedCapIDs, ","), "error", result.GetError()) return }