22
33import dev .dbos .transact .Constants ;
44import dev .dbos .transact .StartWorkflowOptions ;
5- import dev .dbos .transact .database .ExternalState ;
65import dev .dbos .transact .database .SystemDatabase ;
7- import dev .dbos .transact .workflow .Scheduled ;
86import dev .dbos .transact .workflow .WorkflowSchedule ;
97
10- import java .math .BigInteger ;
118import java .time .Duration ;
129import java .time .Instant ;
1310import java .time .ZoneId ;
1411import java .time .ZonedDateTime ;
1512import java .time .temporal .ChronoUnit ;
1613import java .util .Arrays ;
17- import java .util .List ;
1814import java .util .Objects ;
1915import java .util .concurrent .ConcurrentHashMap ;
2016import java .util .concurrent .Executors ;
@@ -38,32 +34,12 @@ public class SchedulerService implements AutoCloseable {
3834
3935 private static final Logger logger = LoggerFactory .getLogger (SchedulerService .class );
4036
41- record AnnotatedScheduledWorkflow (
42- RegisteredWorkflow workflow , Cron cron , String queue , boolean automaticBackfill ) {}
43-
4437 public static final CronParser CRON_PARSER =
4538 new CronParser (CronDefinitionBuilder .instanceDefinitionFor (CronType .SPRING53 ));
46- private static final Class <?>[] ANNOTATED_EXPECTED_PARAMETERS =
47- new Class <?>[] {Instant .class , Instant .class };
4839 private static final Class <?>[] EXPECTED_PARAMETERS =
4940 new Class <?>[] {Instant .class , Object .class };
5041 private static final Duration MAX_JITTER = Duration .ofSeconds (10 );
5142
52- public static void validateAnnotatedWorkflowSchedule (RegisteredWorkflow workflow ) {
53- var method = workflow .workflowMethod ();
54- var skedTag = method .getAnnotation (Scheduled .class );
55- if (skedTag != null ) {
56- var paramTypes = method .getParameterTypes ();
57- if (!Arrays .equals (paramTypes , ANNOTATED_EXPECTED_PARAMETERS )) {
58- throw new IllegalArgumentException (
59- "Invalid signature for annotated workflow schedule %s. Signature must be (Instant, Instant)"
60- .formatted (workflow .fullyQualifiedName ()));
61- }
62-
63- CRON_PARSER .parse (skedTag .cron ()).validate ();
64- }
65- }
66-
6743 private final DBOSExecutor dbosExecutor ;
6844 private final SystemDatabase systemDatabase ;
6945 private final Duration pollingInterval ;
@@ -86,7 +62,6 @@ public void start() {
8662 if (this .execServiceRef .compareAndSet (null , scheduler )) {
8763 scheduler .scheduleAtFixedRate (
8864 this ::pollWorkflowSchedules , 0 , pollingInterval .toMillis (), TimeUnit .MILLISECONDS );
89- startAnnotatedSchedules ();
9065 }
9166 }
9267 }
@@ -277,76 +252,6 @@ public void run() {
277252 }
278253 }
279254
280- private void startAnnotatedSchedules () {
281- var annotatedSchedules = getAnnotatedWorkflowSchedules ();
282- logger .debug ("startAnnotatedSchedules found {} annotated schedules" , annotatedSchedules .size ());
283-
284- for (var swf : annotatedSchedules ) {
285- logger .debug (
286- "Registering annotated schedule {} with cron {}" ,
287- swf .workflow ().fullyQualifiedName (),
288- swf .cron ());
289- var task =
290- new Runnable () {
291-
292- final ExecutionTime executionTime = ExecutionTime .forCron (swf .cron ());
293- final String workflowName = swf .workflow ().fullyQualifiedName ();
294-
295- ZonedDateTime nextTime = getLastTime (dbosExecutor , swf );
296-
297- public void schedule () {
298- executionTime
299- .nextExecution (nextTime )
300- .ifPresent (
301- cronTime -> {
302- this .nextTime = cronTime .truncatedTo (ChronoUnit .SECONDS );
303- scheduleTask (this .nextTime , this );
304- });
305- }
306-
307- @ Override
308- public void run () {
309- // if execServiceRef is null, the scheduler service was shut down so don't start the
310- // workflow or schedule the next execution
311- if (execServiceRef .get () == null ) {
312- return ;
313- }
314-
315- var scheduledTime = nextTime ;
316- try {
317- if (paused .get ()) {
318- logger .debug (
319- "Skipping annotated workflow {} schedule {} because scheduler is paused" ,
320- workflowName ,
321- swf .cron ());
322- return ;
323- }
324- var args = new Object [] {scheduledTime .toInstant (), Instant .now ()};
325- var workflowId =
326- "sched-%s-%s" .formatted (workflowName , scheduledTime .toOffsetDateTime ());
327- logger .debug (
328- "Triggering annotated workflow {} at {} workflowId {}" ,
329- workflowName ,
330- args [1 ],
331- workflowId );
332- var appVersion = dbosExecutor .getLatestApplicationVersion ().versionName ();
333- var options =
334- new StartWorkflowOptions (workflowId )
335- .withQueue (swf .queue ())
336- .withAppVersion (appVersion );
337- dbosExecutor .startRegisteredWorkflow (swf .workflow (), args , options );
338- nextTime = setLastTime (dbosExecutor , swf , scheduledTime );
339- } catch (Exception e ) {
340- logger .error ("Annotated scheduled task exception {}" , workflowName , e );
341- } finally {
342- schedule ();
343- }
344- }
345- };
346- task .schedule ();
347- }
348- }
349-
350255 private ScheduledFuture <?> scheduleTask (ZonedDateTime nextTime , Runnable task ) {
351256 // to prevent the "thundering herd" problem in a distributed setting,
352257 // apply a jitter of up to 10% of the sleep time, capped at 10 seconds
@@ -371,80 +276,4 @@ private void cancelWorkflowSchedule(String scheduleId) {
371276 future .cancel (false );
372277 }
373278 }
374-
375- private static ZonedDateTime getLastTime (
376- DBOSExecutor dbosExecutor , AnnotatedScheduledWorkflow swf ) {
377- if (swf .automaticBackfill ()) {
378- var state =
379- dbosExecutor .getExternalState (
380- "DBOS.SchedulerService" , swf .workflow ().fullyQualifiedName (), "lastTime" );
381- if (state .isPresent ()) {
382- return ZonedDateTime .parse (state .get ().value ());
383- }
384- }
385- return ZonedDateTime .now ();
386- }
387-
388- private static ZonedDateTime setLastTime (
389- DBOSExecutor dbosExecutor , AnnotatedScheduledWorkflow swf , ZonedDateTime lastTime ) {
390- if (!swf .automaticBackfill ()) {
391- return ZonedDateTime .now ();
392- }
393-
394- var state =
395- dbosExecutor .upsertExternalState (
396- new ExternalState (
397- "DBOS.SchedulerService" ,
398- swf .workflow ().fullyQualifiedName (),
399- "lastTime" ,
400- lastTime .toString (),
401- null ,
402- BigInteger .valueOf (lastTime .toInstant ().toEpochMilli ())));
403-
404- return ZonedDateTime .parse (state .value ()).plus (1 , ChronoUnit .MILLIS );
405- }
406-
407- private List <AnnotatedScheduledWorkflow > getAnnotatedWorkflowSchedules () {
408- return dbosExecutor .getRegisteredWorkflows ().stream ()
409- .map (
410- wf -> {
411- var method = wf .workflowMethod ();
412- var schedTag = method .getAnnotation (Scheduled .class );
413- if (schedTag == null ) {
414- return null ;
415- }
416-
417- if (!Arrays .equals (method .getParameterTypes (), ANNOTATED_EXPECTED_PARAMETERS )) {
418- logger .error (
419- "Annotated workflow schedule {} has invalid signature, signature must be (Instant, Instant)" ,
420- wf .fullyQualifiedName ());
421- return null ;
422- }
423-
424- var queueName =
425- schedTag .queue ().isEmpty () ? Constants .DBOS_INTERNAL_QUEUE : schedTag .queue ();
426- if (dbosExecutor .getQueue (queueName ).isEmpty ()) {
427- logger .error (
428- "Annotated workflow schedule {} refers to undefined queue {}" ,
429- wf .fullyQualifiedName (),
430- queueName );
431- return null ;
432- }
433-
434- try {
435- var cron = CRON_PARSER .parse (schedTag .cron ()).validate ();
436- return new AnnotatedScheduledWorkflow (
437- wf , cron , queueName , schedTag .automaticBackfill ());
438- } catch (Exception e ) {
439- logger .error (
440- "Annotated workflow schedule {} has invalid cron expression {}" ,
441- wf .fullyQualifiedName (),
442- schedTag .cron (),
443- e );
444- return null ;
445- }
446- })
447- .filter (Objects ::nonNull )
448- .toList ();
449- }
450279}
0 commit comments