Skip to content

Commit 4a9038d

Browse files
authored
cre-3444 (#22204)
* cre-3444: execution drain on wf delete * cre-3444: race case cover for deletion retry during wf reactivate * cre-3444: metrics * cre-3444: various code improvements * cre-3444: IsDraining() refactor into Drain() (bool) * cre-3444: refactor to avoid side-effect pattern, use phase instead * cre-3444: remove timing from test; deterministic flow * cre-3444: minor improvement
1 parent cef24ee commit 4a9038d

9 files changed

Lines changed: 678 additions & 15 deletions

File tree

core/services/workflows/monitoring/monitoring.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,9 @@ type EngineMetrics struct {
6363

6464
triggerEventEnqueuedCounter metric.Int64Counter
6565
triggerEventEnqueueDroppedCounter metric.Int64Counter
66+
triggerEventDequeueDroppedCounter metric.Int64Counter
6667
triggerEventExpiredCounter metric.Int64Counter
68+
triggerExecutionDeduplicatedCounter metric.Int64Counter
6769
triggerEventQueueWaitSeconds metric.Float64Histogram
6870
triggerQueueToExecutionStartSeconds metric.Float64Histogram
6971
triggerPayloadBytes metric.Int64Histogram
@@ -301,6 +303,14 @@ func InitMonitoringResources() (em *EngineMetrics, err error) {
301303
return nil, fmt.Errorf("failed to register trigger event enqueue dropped counter: %w", err)
302304
}
303305

306+
em.triggerEventDequeueDroppedCounter, err = beholder.GetMeter().Int64Counter(
307+
"platform_engine_trigger_event_dequeue_dropped_total",
308+
metric.WithDescription("Trigger events dropped after dequeue before execution (e.g. engine draining)"),
309+
)
310+
if err != nil {
311+
return nil, fmt.Errorf("failed to register trigger event dequeue dropped counter: %w", err)
312+
}
313+
304314
em.triggerEventExpiredCounter, err = beholder.GetMeter().Int64Counter(
305315
"platform_engine_trigger_event_expired_total",
306316
metric.WithDescription("Trigger events dropped for exceeding max queue wait time"),
@@ -309,6 +319,14 @@ func InitMonitoringResources() (em *EngineMetrics, err error) {
309319
return nil, fmt.Errorf("failed to register trigger event expired counter: %w", err)
310320
}
311321

322+
em.triggerExecutionDeduplicatedCounter, err = beholder.GetMeter().Int64Counter(
323+
"platform_engine_trigger_execution_deduplicated_total",
324+
metric.WithDescription("Trigger events skipped because execution ID already exists"),
325+
)
326+
if err != nil {
327+
return nil, fmt.Errorf("failed to register trigger execution deduplicated counter: %w", err)
328+
}
329+
312330
em.triggerEventQueueWaitSeconds, err = beholder.GetMeter().Float64Histogram(
313331
"platform_engine_trigger_event_queue_wait_seconds",
314332
metric.WithDescription("Time from enqueue timestamp until dequeue from the trigger event queue"),
@@ -626,11 +644,21 @@ func (c WorkflowsMetricLabeler) IncrementTriggerEventEnqueueDroppedCounter(ctx c
626644
c.em.triggerEventEnqueueDroppedCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
627645
}
628646

647+
func (c WorkflowsMetricLabeler) IncrementTriggerEventDequeueDroppedCounter(ctx context.Context) {
648+
otelLabels := beholder.OtelAttributes(c.Labels).AsStringAttributes()
649+
c.em.triggerEventDequeueDroppedCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
650+
}
651+
629652
func (c WorkflowsMetricLabeler) IncrementTriggerEventExpiredCounter(ctx context.Context) {
630653
otelLabels := beholder.OtelAttributes(c.Labels).AsStringAttributes()
631654
c.em.triggerEventExpiredCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
632655
}
633656

657+
func (c WorkflowsMetricLabeler) IncrementTriggerExecutionDeduplicatedCounter(ctx context.Context) {
658+
otelLabels := beholder.OtelAttributes(c.Labels).AsStringAttributes()
659+
c.em.triggerExecutionDeduplicatedCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
660+
}
661+
634662
func (c WorkflowsMetricLabeler) RecordTriggerEventQueueWaitSeconds(ctx context.Context, waitSeconds float64) {
635663
otelLabels := beholder.OtelAttributes(c.Labels).AsStringAttributes()
636664
c.em.triggerEventQueueWaitSeconds.Record(ctx, waitSeconds, metric.WithAttributes(otelLabels...))

core/services/workflows/syncer/v2/handler.go

Lines changed: 74 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,14 @@ type ORM interface {
5353
// creation since they don't support async initialization hooks.
5454
type engineFactoryFn func(ctx context.Context, wfid string, owner string, name types.WorkflowName, tag string, config []byte, binary []byte, initDone chan<- error) (services.Service, error)
5555

56+
type DrainableService interface {
57+
Drain() bool
58+
ActiveExecutions() int32
59+
DrainStartedAt() (time.Time, bool)
60+
}
61+
62+
var ErrDrainInProgress = errors.New("drain in progress")
63+
5664
// eventHandler is a handler for WorkflowRegistryEvent events. Each event type has a corresponding method that handles the event.
5765
type eventHandler struct {
5866
services.Service
@@ -95,6 +103,8 @@ type eventHandler struct {
95103
shardingEnabled bool
96104
myShardID uint32
97105
shardRoutingSteady *shardownership.SteadySignal
106+
107+
metrics *metrics
98108
}
99109

100110
func WithEngineRegistry(er *EngineRegistry) func(*eventHandler) {
@@ -245,6 +255,11 @@ func NewEventHandler(
245255
workflowDonSubscriber: workflowDonSubscriber,
246256
tracer: noop.NewTracerProvider().Tracer(""), // default to noop, enable via WithDebugMode
247257
}
258+
metricsInst, metricsErr := newMetrics()
259+
if metricsErr != nil {
260+
return nil, fmt.Errorf("new metrics: %w", metricsErr)
261+
}
262+
eh.metrics = metricsInst
248263
eh.engineFactory = eh.engineFactoryFn
249264
for _, o := range opts {
250265
o(eh)
@@ -363,8 +378,12 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error {
363378
}
364379
}()
365380

366-
if err := h.workflowPausedEvent(ctx, payload); err != nil {
367-
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow paused event: %v", err), h.lggr)
381+
if err = h.workflowPausedEvent(ctx, payload); err != nil {
382+
if errors.Is(err, ErrDrainInProgress) {
383+
logCustMsg(ctx, cma, fmt.Sprintf("workflow pause deferred: %v", err), h.lggr)
384+
} else {
385+
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow paused event: %v", err), h.lggr)
386+
}
368387
return err
369388
}
370389

@@ -417,8 +436,12 @@ func (h *eventHandler) Handle(ctx context.Context, event Event) error {
417436
}
418437
}()
419438

420-
if herr := h.workflowDeletedEvent(ctx, payload); herr != nil {
421-
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow deleted event: %v", herr), h.lggr)
439+
if herr = h.workflowDeletedEvent(ctx, payload); herr != nil {
440+
if errors.Is(herr, ErrDrainInProgress) {
441+
logCustMsg(ctx, cma, fmt.Sprintf("workflow deletion deferred: %v", herr), h.lggr)
442+
} else {
443+
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow deleted event: %v", herr), h.lggr)
444+
}
422445
return herr
423446
}
424447

@@ -497,12 +520,24 @@ func (h *eventHandler) workflowRegisteredEvent(
497520
// We know we need an engine, let's make sure that there isn't already one running for this workflow ID.
498521
prevEngine, ok := h.engineRegistry.Get(payload.WorkflowID)
499522
if ok && prevEngine.Ready() == nil && spec.Status == job.WorkflowSpecStatusActive {
523+
drainable, isDrainable := prevEngine.Service.(DrainableService)
524+
isDraining := false
525+
if isDrainable {
526+
_, isDraining = drainable.DrainStartedAt()
527+
}
528+
if isDrainable && isDraining {
529+
h.lggr.Infow("engine is draining, replacing with a new engine", "workflowID", payload.WorkflowID.Hex())
530+
}
531+
500532
// This is the happy-path, we're done.
501-
return nil
533+
if !isDrainable || !isDraining {
534+
return nil
535+
}
502536
}
503537

504538
// Any other case ->
505539
// - engine in registry, but service isn't running
540+
// - engine in registry and service is running, but it's draining and must be replaced
506541
// - state isn't active
507542
// Let's clean up and recreate
508543

@@ -684,9 +719,31 @@ func (h *eventHandler) workflowDeletedEvent(
684719
// closed.
685720
// At the same time, popping the engine should occur last to allow deletes to be retried if any of the
686721
// prior steps fail.
722+
workflowID := payload.WorkflowID.Hex()
687723
e, ok := h.engineRegistry.Get(payload.WorkflowID)
724+
var drainable DrainableService
725+
var isDrainable bool
688726
if ok {
689-
if innerErr := e.Close(); innerErr != nil {
727+
if drainable, isDrainable = e.Service.(DrainableService); isDrainable {
728+
if started := drainable.Drain(); started {
729+
h.lggr.Infow("initiated drain for workflow engine", "workflowID", workflowID)
730+
if h.metrics != nil {
731+
h.metrics.incrementDrainStarted(ctx)
732+
}
733+
}
734+
735+
if active := drainable.ActiveExecutions(); active > 0 {
736+
if h.metrics != nil {
737+
h.metrics.incrementDeleteDeferred(ctx, "drain_in_progress")
738+
}
739+
h.lggr.Infow("workflow deletion deferred: active executions still running",
740+
"workflowID", workflowID,
741+
"activeExecutions", active)
742+
return fmt.Errorf("%w: %d active executions still running", ErrDrainInProgress, active)
743+
}
744+
}
745+
746+
if innerErr := e.Close(); innerErr != nil && !errors.Is(innerErr, services.ErrAlreadyStopped) {
690747
return fmt.Errorf("failed to close workflow engine: %w", innerErr)
691748
}
692749
}
@@ -699,7 +756,17 @@ func (h *eventHandler) workflowDeletedEvent(
699756
if errors.Is(err, ErrNotFound) {
700757
return nil
701758
}
702-
return err
759+
if err != nil {
760+
return err
761+
}
762+
763+
if isDrainable {
764+
startedAt, exists := drainable.DrainStartedAt()
765+
if exists && h.metrics != nil {
766+
h.metrics.recordDrainCompleted(ctx, time.Since(startedAt))
767+
}
768+
}
769+
return nil
703770
}
704771

705772
// tryEngineCleanup attempts to stop the workflow engine for the given workflow ID. Does nothing if the

core/services/workflows/syncer/v2/handler_test.go

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ import (
88
"fmt"
99
"math/big"
1010
"net"
11+
"sync/atomic"
1112
"testing"
13+
"time"
1214

1315
"github.com/jonboulle/clockwork"
1416
"google.golang.org/grpc"
@@ -17,6 +19,7 @@ import (
1719
"github.com/stretchr/testify/assert"
1820
"github.com/stretchr/testify/mock"
1921
"github.com/stretchr/testify/require"
22+
"go.opentelemetry.io/otel/trace/noop"
2023
"google.golang.org/protobuf/proto"
2124

2225
"github.com/smartcontractkit/chainlink-common/pkg/beholder/beholdertest"
@@ -106,6 +109,41 @@ func (m *mockEngine) HealthReport() map[string]error { return nil }
106109

107110
func (m *mockEngine) Name() string { return "mockEngine" }
108111

112+
type mockDrainableEngine struct {
113+
mockEngine
114+
draining atomic.Bool
115+
activeExecutions atomic.Int32
116+
drainCalls atomic.Int32
117+
closeCalls atomic.Int32
118+
drainStartedAtNs atomic.Int64
119+
}
120+
121+
func (m *mockDrainableEngine) Drain() bool {
122+
started := m.draining.CompareAndSwap(false, true)
123+
m.draining.Store(true)
124+
m.drainCalls.Add(1)
125+
m.drainStartedAtNs.CompareAndSwap(0, time.Now().UnixNano())
126+
return started
127+
}
128+
129+
func (m *mockDrainableEngine) ActiveExecutions() int32 {
130+
return m.activeExecutions.Load()
131+
}
132+
133+
func (m *mockDrainableEngine) DrainStartedAt() (time.Time, bool) {
134+
ns := m.drainStartedAtNs.Load()
135+
if ns == 0 {
136+
return time.Time{}, false
137+
}
138+
139+
return time.Unix(0, ns), true
140+
}
141+
142+
func (m *mockDrainableEngine) Close() error {
143+
m.closeCalls.Add(1)
144+
return m.CloseErr
145+
}
146+
109147
// mockEngineFactory returns a standard mock engine factory for tests.
110148
// It sends nil to initDone to signal successful initialization.
111149
func mockEngineFactory(ctx context.Context, wfid string, owner string, name types.WorkflowName, tag string, config []byte, binary []byte, initDone chan<- error) (services.Service, error) {
@@ -1268,6 +1306,122 @@ func Test_workflowDeletedHandler(t *testing.T) {
12681306
})
12691307
}
12701308

1309+
type stubWorkflowArtifactsStore struct {
1310+
spec *job.WorkflowSpec
1311+
deleteErr error
1312+
deleteCalls atomic.Int32
1313+
}
1314+
1315+
func (s *stubWorkflowArtifactsStore) FetchWorkflowArtifacts(context.Context, string, string, string) ([]byte, []byte, error) {
1316+
return nil, nil, nil
1317+
}
1318+
1319+
func (s *stubWorkflowArtifactsStore) GetWorkflowSpec(context.Context, string) (*job.WorkflowSpec, error) {
1320+
if s.spec == nil {
1321+
return nil, errors.New("not found")
1322+
}
1323+
return s.spec, nil
1324+
}
1325+
1326+
func (s *stubWorkflowArtifactsStore) UpsertWorkflowSpec(context.Context, *job.WorkflowSpec) (int64, error) {
1327+
return 1, nil
1328+
}
1329+
1330+
func (s *stubWorkflowArtifactsStore) DeleteWorkflowArtifacts(context.Context, string) error {
1331+
s.deleteCalls.Add(1)
1332+
return s.deleteErr
1333+
}
1334+
1335+
func (s *stubWorkflowArtifactsStore) DeleteWorkflowArtifactsBatch(context.Context, []string) error {
1336+
return nil
1337+
}
1338+
1339+
func Test_workflowDeletedEvent_DrainInProgress(t *testing.T) {
1340+
t.Parallel()
1341+
1342+
workflowID := types.WorkflowID{1}
1343+
drainable := &mockDrainableEngine{}
1344+
drainable.activeExecutions.Store(2)
1345+
artifactStore := &stubWorkflowArtifactsStore{}
1346+
registry := NewEngineRegistry()
1347+
require.NoError(t, registry.Add(workflowID, "test-source", drainable))
1348+
1349+
h := &eventHandler{
1350+
lggr: logger.TestLogger(t),
1351+
engineRegistry: registry,
1352+
workflowArtifactsStore: artifactStore,
1353+
}
1354+
1355+
err := h.workflowDeletedEvent(t.Context(), WorkflowDeletedEvent{WorkflowID: workflowID})
1356+
require.Error(t, err)
1357+
require.ErrorIs(t, err, ErrDrainInProgress)
1358+
assert.Equal(t, int32(1), drainable.drainCalls.Load())
1359+
assert.Equal(t, int32(0), drainable.closeCalls.Load())
1360+
assert.Equal(t, int32(0), artifactStore.deleteCalls.Load())
1361+
_, ok := registry.Get(workflowID)
1362+
assert.True(t, ok)
1363+
}
1364+
1365+
func Test_workflowDeletedEvent_IgnoresErrAlreadyStopped(t *testing.T) {
1366+
t.Parallel()
1367+
1368+
workflowID := types.WorkflowID{2}
1369+
drainable := &mockDrainableEngine{}
1370+
drainable.CloseErr = services.ErrAlreadyStopped
1371+
artifactStore := &stubWorkflowArtifactsStore{}
1372+
registry := NewEngineRegistry()
1373+
require.NoError(t, registry.Add(workflowID, "test-source", drainable))
1374+
1375+
h := &eventHandler{
1376+
lggr: logger.TestLogger(t),
1377+
engineRegistry: registry,
1378+
workflowArtifactsStore: artifactStore,
1379+
}
1380+
1381+
err := h.workflowDeletedEvent(t.Context(), WorkflowDeletedEvent{WorkflowID: workflowID})
1382+
require.NoError(t, err)
1383+
assert.Equal(t, int32(1), drainable.closeCalls.Load())
1384+
assert.Equal(t, int32(1), artifactStore.deleteCalls.Load())
1385+
_, ok := registry.Get(workflowID)
1386+
assert.False(t, ok)
1387+
}
1388+
1389+
func Test_workflowRegisteredEvent_DrainingEngineNotTreatedAsHealthy(t *testing.T) {
1390+
t.Parallel()
1391+
1392+
workflowID := types.WorkflowID{3}
1393+
drainable := &mockDrainableEngine{
1394+
mockEngine: mockEngine{
1395+
CloseErr: assert.AnError,
1396+
},
1397+
}
1398+
require.True(t, drainable.Drain())
1399+
1400+
registry := NewEngineRegistry()
1401+
require.NoError(t, registry.Add(workflowID, "test-source", drainable))
1402+
1403+
artifactStore := &stubWorkflowArtifactsStore{
1404+
spec: &job.WorkflowSpec{
1405+
WorkflowID: workflowID.Hex(),
1406+
Status: job.WorkflowSpecStatusActive,
1407+
},
1408+
}
1409+
h := &eventHandler{
1410+
lggr: logger.TestLogger(t),
1411+
engineRegistry: registry,
1412+
workflowArtifactsStore: artifactStore,
1413+
tracer: noop.NewTracerProvider().Tracer(""),
1414+
}
1415+
1416+
err := h.workflowRegisteredEvent(t.Context(), WorkflowRegisteredEvent{
1417+
Status: WorkflowStatusActive,
1418+
WorkflowID: workflowID,
1419+
})
1420+
require.Error(t, err)
1421+
require.Contains(t, err.Error(), "could not clean up old engine")
1422+
assert.Equal(t, int32(1), drainable.closeCalls.Load())
1423+
}
1424+
12711425
// mockLinkingService implements the LinkingServiceServer interface for testing
12721426
type mockLinkingService struct {
12731427
linkingclient.UnimplementedLinkingServiceServer

0 commit comments

Comments
 (0)