66 "fmt"
77 "time"
88
9+ "github.com/go-co-op/gocron/v2"
910 "google.golang.org/protobuf/types/known/timestamppb"
1011
1112 "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
@@ -25,6 +26,7 @@ var _ cronserver.CronCapability = (*ManualCronTriggerService)(nil)
2526const ServiceName = "CronTriggerService"
2627const ID = "cron-trigger@1.0.0"
2728const defaultFastestScheduleIntervalSeconds = 1
29+ const allowSeconds = true
2830
2931var manualCronTriggerInfo = capabilities .MustNewCapabilityInfo (
3032 ID ,
@@ -43,19 +45,28 @@ type ManualCronTriggerService struct {
4345 callbackCh map [string ]chan capabilities.TriggerAndId [* crontypedapi.Payload ]
4446 legacyCallbackCh chan capabilities.TriggerAndId [* crontypedapi.LegacyPayload ] //nolint:staticcheck // LegacyPayload intentionally used for backward compatibility
4547 workflowIDs map [string ]string // triggerID -> workflowID mapping
48+ triggerConfigs map [string ]* crontypedapi.Config
49+ scheduler gocron.Scheduler
4650}
4751
48- func NewManualCronTriggerService (parentLggr logger.Logger ) * ManualCronTriggerService {
52+ func NewManualCronTriggerService (parentLggr logger.Logger ) ( * ManualCronTriggerService , error ) {
4953 lggr := logger .Named (parentLggr , "CronTriggerService" ) // ManualCronTriggerService
5054
55+ scheduler , err := gocron .NewScheduler ()
56+ if err != nil {
57+ return nil , fmt .Errorf ("failed to create cron scheduler: %w" , err )
58+ }
59+
5160 return & ManualCronTriggerService {
5261 CapabilityInfo : manualCronTriggerInfo ,
5362 config : ManualCronConfig {FastestScheduleIntervalSeconds : 1 },
5463 lggr : lggr ,
5564 callbackCh : make (map [string ]chan capabilities.TriggerAndId [* crontypedapi.Payload ]),
5665 legacyCallbackCh : make (chan capabilities.TriggerAndId [* crontypedapi.LegacyPayload ]), //nolint:staticcheck // LegacyPayload intentionally used for backward compatibility
5766 workflowIDs : make (map [string ]string ),
58- }
67+ triggerConfigs : make (map [string ]* crontypedapi.Config ),
68+ scheduler : scheduler ,
69+ }, nil
5970}
6071
6172func (f * ManualCronTriggerService ) Initialise (ctx context.Context , dependencies core.StandardCapabilitiesDependencies ) error {
@@ -75,17 +86,17 @@ func (f *ManualCronTriggerService) Initialise(ctx context.Context, dependencies
7586
7687 f .config = cronConfig
7788
78- err := f .Start (ctx )
79- if err != nil {
89+ if err := f .Start (ctx ); err != nil {
8090 return fmt .Errorf ("error when starting trigger service: %w" , err )
8191 }
8292
8393 return nil
8494}
8595
8696func (f * ManualCronTriggerService ) RegisterTrigger (ctx context.Context , triggerID string , metadata capabilities.RequestMetadata , input * crontypedapi.Config ) (<- chan capabilities.TriggerAndId [* crontypedapi.Payload ], caperrors.Error ) {
87- f .callbackCh [triggerID ] = make (chan capabilities.TriggerAndId [* crontypedapi.Payload ])
97+ f .callbackCh [triggerID ] = make (chan capabilities.TriggerAndId [* crontypedapi.Payload ], 1 )
8898 f .workflowIDs [triggerID ] = metadata .WorkflowID
99+ f .triggerConfigs [triggerID ] = input
89100 return f .callbackCh [triggerID ], nil
90101}
91102
@@ -105,7 +116,28 @@ func (f *ManualCronTriggerService) AckEvent(ctx context.Context, triggerID strin
105116 return nil
106117}
107118
108- func (f * ManualCronTriggerService ) ManualTrigger (ctx context.Context , triggerID string , scheduledExecutionTime time.Time ) error {
119+ func (f * ManualCronTriggerService ) ManualTrigger (ctx context.Context , triggerID string , skipWait <- chan struct {}) error {
120+ config , exists := f .triggerConfigs [triggerID ]
121+ if ! exists {
122+ return fmt .Errorf (`trigger config "%s" not found` , triggerID )
123+ }
124+
125+ jobFired := make (chan struct {}, 1 )
126+ job , err := f .scheduler .NewJob (
127+ gocron .CronJob (config .Schedule , allowSeconds ),
128+ gocron .NewTask (func () {
129+ defer close (jobFired )
130+ jobFired <- struct {}{}
131+ }),
132+ )
133+ if err != nil {
134+ return fmt .Errorf ("failed to create cron job: %w" , err )
135+ }
136+ scheduledExecutionTime , err := job .NextRun ()
137+ if err != nil {
138+ return fmt .Errorf ("failed to get next scheduled execution time: %w" , err )
139+ }
140+
109141 f .lggr .Debugf ("ManualTrigger: %s" , scheduledExecutionTime .Format (time .RFC3339Nano ))
110142
111143 triggerEvent := f .createManualTriggerEvent (scheduledExecutionTime )
@@ -128,16 +160,22 @@ func (f *ManualCronTriggerService) ManualTrigger(ctx context.Context, triggerID
128160 f .lggr .Errorw ("failed to emit trigger execution started event" , "err" , err )
129161 }
130162
131- go func () {
132- select {
133- case f .callbackCh [triggerID ] <- triggerEvent :
134- // Successfully sent trigger response
135- case <- ctx .Done ():
136- // Context cancelled, cleanup goroutine
137- f .lggr .Debug ("ManualTrigger goroutine cancelled due to context cancellation" )
138- }
163+ defer func () {
164+ _ = f .scheduler .RemoveJob (job .ID ())
139165 }()
140166
167+ // Either wait for cron scheduler or skip wait signal
168+ select {
169+ case <- skipWait :
170+ break
171+ case <- jobFired :
172+ break
173+ case <- ctx .Done ():
174+ return ctx .Err ()
175+ }
176+
177+ // Sent trigger response
178+ f .callbackCh [triggerID ] <- triggerEvent
141179 return nil
142180}
143181
@@ -161,11 +199,15 @@ func (f *ManualCronTriggerService) createManualTriggerEvent(scheduledExecutionTi
161199
162200func (f * ManualCronTriggerService ) Start (ctx context.Context ) error {
163201 f .lggr .Debugw ("Starting ManualCronTriggerService" )
202+ f .scheduler .Start ()
164203 return nil
165204}
166205
167206func (f * ManualCronTriggerService ) Close () error {
168207 f .lggr .Debug ("Closing ManualCronTriggerService" )
208+ if err := f .scheduler .Shutdown (); err != nil {
209+ f .lggr .Errorw ("failed to close scheduler" , "err" , err )
210+ }
169211 return nil
170212}
171213
0 commit comments