Skip to content

Commit ae52d5a

Browse files
authored
[CRE] Engine: fix a bug in eventAge calculation (#21985)
1 parent 3f817f2 commit ae52d5a

2 files changed

Lines changed: 146 additions & 1 deletion

File tree

core/services/workflows/v2/engine.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -589,7 +589,7 @@ func (e *Engine) handleAllTriggerEvents(ctx context.Context) {
589589
if err != nil {
590590
return
591591
}
592-
eventAge := queueHead.timestamp.Sub(e.cfg.Clock.Now())
592+
eventAge := e.cfg.Clock.Now().Sub(queueHead.timestamp)
593593
eventID := queueHead.event.Event.ID
594594
e.logger().Debugw("Popped a trigger event from the queue", "eventID", eventID, "eventAgeMs", eventAge.Milliseconds())
595595
triggerEventMaxAge, err := e.cfg.LocalLimiters.TriggerEventQueueTime.Limit(ctx)

core/services/workflows/v2/engine_execution_concurrency_test.go

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,21 @@ package v2_test
22

33
import (
44
"context"
5+
"fmt"
56
"runtime"
67
"slices"
78
"sync"
89
"testing"
910
"time"
1011

12+
"github.com/jonboulle/clockwork"
1113
"github.com/stretchr/testify/mock"
1214
"github.com/stretchr/testify/require"
15+
"go.uber.org/zap/zapcore"
16+
"go.uber.org/zap/zaptest/observer"
1317

1418
"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
19+
"github.com/smartcontractkit/chainlink-common/pkg/logger"
1520
"github.com/smartcontractkit/chainlink-common/pkg/settings/cresettings"
1621
modulemocks "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host/mocks"
1722
sdkpb "github.com/smartcontractkit/chainlink-protos/cre/go/sdk"
@@ -145,3 +150,143 @@ func TestEngine_ExecutionConcurrencySerializesOverlappingRuns(t *testing.T) {
145150

146151
require.NoError(t, engine.Close())
147152
}
153+
154+
// TestEngine_StaleTriggerEventIsSkipped proves that trigger events older than
155+
// TriggerEventQueueTimeout are dropped and never reach Module.Execute.
156+
//
157+
// Strategy: with ExecutionConcurrencyLimit=1 and a FakeClock we can control
158+
// exactly which events age out. We send 7 "early" events. event_0's Execute
159+
// blocks (holding the semaphore), event_1 gets popped by the handler but
160+
// stalls at the semaphore (already past its age check), and events 2-6 remain
161+
// in the queue. After advancing the fake clock past the timeout, we unblock
162+
// event_0. event_1 resumes and executes (it already cleared the age check),
163+
// while events 2-6 are popped and detected as stale (5 skipped). Then 3
164+
// fresh events are sent and all execute. Total: 10 events, 5 expire, 5
165+
// execute.
166+
func TestEngine_StaleTriggerEventIsSkipped(t *testing.T) {
167+
t.Parallel()
168+
169+
const queueTimeout = 5 * time.Second
170+
171+
fakeClock := clockwork.NewFakeClock()
172+
blockerStarted := make(chan struct{})
173+
blockerRelease := make(chan struct{})
174+
175+
module := modulemocks.NewModuleV2(t)
176+
module.EXPECT().Start().Once()
177+
// init call → return trigger subscriptions
178+
module.EXPECT().Execute(matches.AnyContext, mock.Anything, mock.Anything).
179+
Return(newTriggerSubs(1), nil).Once()
180+
// event_0 → block until we release it (holds the semaphore)
181+
module.EXPECT().Execute(matches.AnyContext, mock.Anything, mock.Anything).
182+
Run(func(_ context.Context, _ *sdkpb.ExecuteRequest, _ host.ExecutionHelper) {
183+
close(blockerStarted)
184+
<-blockerRelease
185+
}).Return(nil, nil).Once()
186+
// event_1 + 3 fresh events = 4 fast executions
187+
module.EXPECT().Execute(matches.AnyContext, mock.Anything, mock.Anything).
188+
Return(nil, nil).Times(4)
189+
module.EXPECT().Close().Once()
190+
191+
capreg := regmocks.NewCapabilitiesRegistry(t)
192+
capreg.EXPECT().LocalNode(matches.AnyContext).Return(newNode(t), nil).Once()
193+
194+
initDoneCh := make(chan error, 1)
195+
subscribedToTriggersCh := make(chan []string, 1)
196+
executionFinishedCh := make(chan string, 10)
197+
198+
var lggr logger.Logger
199+
var logs *observer.ObservedLogs
200+
201+
cfg := defaultTestConfig(t, func(cfg *cresettings.Workflows) {
202+
cfg.TriggerEventQueueTimeout.DefaultValue = queueTimeout
203+
cfg.ExecutionConcurrencyLimit.DefaultValue = 1
204+
})
205+
lggr, logs = logger.TestObserved(t, zapcore.WarnLevel)
206+
cfg.Lggr = lggr
207+
cfg.Clock = fakeClock
208+
cfg.Module = module
209+
cfg.CapRegistry = capreg
210+
cfg.BillingClient = setupMockBillingClient(t)
211+
212+
wantExecIDs := make(map[string]struct{}, 5)
213+
for _, eid := range []string{"event_0", "event_1", "fresh_0", "fresh_1", "fresh_2"} {
214+
id, err := workflowEvents.GenerateExecutionID(cfg.WorkflowID, eid)
215+
require.NoError(t, err)
216+
wantExecIDs[id] = struct{}{}
217+
}
218+
219+
cfg.Hooks = v2.LifecycleHooks{
220+
OnInitialized: func(err error) {
221+
initDoneCh <- err
222+
},
223+
OnSubscribedToTriggers: func(triggerIDs []string) {
224+
subscribedToTriggersCh <- triggerIDs
225+
},
226+
OnExecutionFinished: func(executionID string, _ string) {
227+
executionFinishedCh <- executionID
228+
},
229+
}
230+
231+
engine, err := v2.NewEngine(cfg)
232+
require.NoError(t, err)
233+
234+
trigger := capmocks.NewTriggerCapability(t)
235+
capreg.EXPECT().GetTrigger(matches.AnyContext, "id_0").Return(trigger, nil).Once()
236+
eventCh := make(chan capabilities.TriggerResponse)
237+
trigger.EXPECT().RegisterTrigger(matches.AnyContext, mock.Anything).Return(eventCh, nil).Once()
238+
trigger.EXPECT().UnregisterTrigger(matches.AnyContext, mock.Anything).Return(nil).Once()
239+
trigger.EXPECT().AckEvent(matches.AnyContext, mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
240+
241+
require.NoError(t, engine.Start(t.Context()))
242+
require.NoError(t, <-initDoneCh)
243+
require.Equal(t, []string{"id_0"}, <-subscribedToTriggersCh)
244+
245+
// Send 7 events that will be timestamped at the current fake clock time.
246+
// event_0 will execute and block; event_1 will be popped but stall at the
247+
// semaphore; events 2-6 will sit in the queue.
248+
for i := range 7 {
249+
eventCh <- capabilities.TriggerResponse{
250+
Event: capabilities.TriggerEvent{
251+
TriggerType: "basic-trigger@1.0.0",
252+
ID: fmt.Sprintf("event_%d", i),
253+
},
254+
}
255+
}
256+
257+
// Wait for event_0's Execute to start (semaphore is now held).
258+
<-blockerStarted
259+
// Give the handler goroutine time to pop event_1 and block at the semaphore.
260+
time.Sleep(200 * time.Millisecond)
261+
262+
// Advance the fake clock so events still in the queue become stale.
263+
fakeClock.Advance(queueTimeout + time.Second)
264+
265+
// Unblock event_0 → event_1 will resume (already past age check),
266+
// events 2-6 will be detected as too old.
267+
close(blockerRelease)
268+
269+
// Send 3 fresh events — timestamped at the advanced clock time.
270+
for i := range 3 {
271+
eventCh <- capabilities.TriggerResponse{
272+
Event: capabilities.TriggerEvent{
273+
TriggerType: "basic-trigger@1.0.0",
274+
ID: fmt.Sprintf("fresh_%d", i),
275+
},
276+
}
277+
}
278+
279+
gotIDs := make(map[string]struct{}, 5)
280+
for range 5 {
281+
gotIDs[<-executionFinishedCh] = struct{}{}
282+
}
283+
require.Equal(t, wantExecIDs, gotIDs,
284+
"expected exactly 5 executions: event_0, event_1, and 3 fresh events")
285+
286+
require.Eventually(t, func() bool {
287+
return logs.FilterMessage("Trigger event is too old, skipping execution").Len() >= 5
288+
}, 2*time.Second, 50*time.Millisecond,
289+
"expected 5 stale-event warnings for events 2-6")
290+
291+
require.NoError(t, engine.Close())
292+
}

0 commit comments

Comments
 (0)