Skip to content

Commit ad05e09

Browse files
authored
Workflow Schedules (#331)
fixes #304
1 parent e437f66 commit ad05e09

46 files changed

Lines changed: 3850 additions & 582 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
.gradle
2+
.kotlin
23
build/
34
!gradle/wrapper/gradle-wrapper.jar
45
!**/src/main/**/build/
@@ -40,4 +41,4 @@ bin/
4041
.vscode/
4142

4243
### Mac OS ###
43-
.DS_Store
44+
.DS_Store

transact-cli/src/test/java/dev/dbos/transact/cli/PgContainer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,7 @@
1414

1515
public class PgContainer implements AutoCloseable {
1616

17-
// SIZE should match junit.jupiter.execution.parallel.config.fixed.parallelism value
18-
private static final int SIZE = 4;
17+
private static final int SIZE = Runtime.getRuntime().availableProcessors();
1918
private static final BlockingQueue<PostgreSQLContainer> POOL = new ArrayBlockingQueue<>(SIZE);
2019
private static final Semaphore PERMITS = new Semaphore(SIZE);
2120

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
junit.jupiter.execution.parallel.enabled = true
22
junit.jupiter.execution.parallel.mode.default = concurrent
33
junit.jupiter.execution.parallel.mode.classes.default = concurrent
4-
junit.jupiter.execution.parallel.config.strategy = fixed
5-
junit.jupiter.execution.parallel.config.fixed.parallelism = 4
4+
junit.jupiter.execution.parallel.config.strategy = dynamic
5+
junit.jupiter.execution.parallel.config.dynamic.factor = 1.0

transact/src/main/java/dev/dbos/transact/DBOS.java

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,23 @@
1616
import dev.dbos.transact.workflow.ForkOptions;
1717
import dev.dbos.transact.workflow.ListWorkflowsInput;
1818
import dev.dbos.transact.workflow.Queue;
19+
import dev.dbos.transact.workflow.ScheduleStatus;
1920
import dev.dbos.transact.workflow.SerializationStrategy;
2021
import dev.dbos.transact.workflow.StepInfo;
2122
import dev.dbos.transact.workflow.StepOptions;
2223
import dev.dbos.transact.workflow.VersionInfo;
2324
import dev.dbos.transact.workflow.Workflow;
2425
import dev.dbos.transact.workflow.WorkflowClassName;
2526
import dev.dbos.transact.workflow.WorkflowHandle;
27+
import dev.dbos.transact.workflow.WorkflowSchedule;
2628
import dev.dbos.transact.workflow.WorkflowStatus;
2729

2830
import java.io.IOException;
2931
import java.io.InputStream;
3032
import java.lang.reflect.Method;
3133
import java.time.Duration;
34+
import java.time.Instant;
35+
import java.time.ZoneId;
3236
import java.util.Collection;
3337
import java.util.HashSet;
3438
import java.util.List;
@@ -759,6 +763,122 @@ public void setLatestApplicationVersion(@NonNull String versionName) {
759763
ensureLaunched("setLatestApplicationVersion").setLatestApplicationVersion(versionName);
760764
}
761765

766+
/**
767+
* Create a cron schedule that periodically invokes a workflow. The scheduleId is generated if
768+
* null.
769+
*
770+
* @param schedule the schedule configuration
771+
*/
772+
public void createSchedule(
773+
@NonNull String scheduleName,
774+
@NonNull String workflowName,
775+
@NonNull String className,
776+
@NonNull String schedule,
777+
@Nullable Object context,
778+
boolean backfill,
779+
@Nullable ZoneId cronTimeZone,
780+
@Nullable String queueName) {
781+
ensureLaunched("createSchedule")
782+
.createSchedule(
783+
scheduleName,
784+
workflowName,
785+
className,
786+
schedule,
787+
context,
788+
backfill,
789+
cronTimeZone,
790+
queueName);
791+
}
792+
793+
/**
794+
* Get a schedule by name.
795+
*
796+
* @param name schedule name
797+
* @return the schedule, or empty if not found
798+
*/
799+
public @NonNull Optional<WorkflowSchedule> getSchedule(@NonNull String name) {
800+
return ensureLaunched("getSchedule").getSchedule(name);
801+
}
802+
803+
/**
804+
* List schedules with optional filters.
805+
*
806+
* @param status filter by status (e.g. "ACTIVE", "PAUSED"); null means no filter
807+
* @param workflowName filter by workflow name; null means no filter
808+
* @param namePrefix filter by schedule name prefix; null means no filter
809+
* @return matching schedules
810+
*/
811+
public @NonNull List<WorkflowSchedule> listSchedules(
812+
@Nullable List<ScheduleStatus> status,
813+
@Nullable List<String> workflowName,
814+
@Nullable List<String> namePrefix) {
815+
return ensureLaunched("listSchedules").listSchedules(status, workflowName, namePrefix);
816+
}
817+
818+
/**
819+
* Delete a schedule by name. No-op if the schedule does not exist.
820+
*
821+
* @param name schedule name
822+
*/
823+
public void deleteSchedule(@NonNull String name) {
824+
ensureLaunched("deleteSchedule").deleteSchedule(name);
825+
}
826+
827+
/**
828+
* Pause a schedule. A paused schedule does not fire.
829+
*
830+
* @param name schedule name
831+
*/
832+
public void pauseSchedule(@NonNull String name) {
833+
ensureLaunched("pauseSchedule").pauseSchedule(name);
834+
}
835+
836+
/**
837+
* Resume a paused schedule so it begins firing again.
838+
*
839+
* @param name schedule name
840+
*/
841+
public void resumeSchedule(@NonNull String name) {
842+
ensureLaunched("resumeSchedule").resumeSchedule(name);
843+
}
844+
845+
/**
846+
* Atomically create or replace a set of schedules. Each schedule is deleted (if it exists) and
847+
* re-created in a single transaction.
848+
*
849+
* @param schedules the schedules to apply
850+
*/
851+
public void applySchedules(@NonNull List<WorkflowSchedule> schedules) {
852+
ensureLaunched("applySchedules").applySchedules(schedules);
853+
}
854+
855+
// /**
856+
// * Enqueue all executions of a schedule that would have run between {@code start} (exclusive)
857+
// and
858+
// * {@code end} (exclusive). Uses the same deterministic workflow IDs as the live scheduler, so
859+
// * already-executed times are skipped.
860+
// *
861+
// * @param scheduleName name of an existing schedule
862+
// * @param start start of the backfill window (exclusive)
863+
// * @param end end of the backfill window (exclusive)
864+
// * @return handles to the enqueued executions
865+
// */
866+
public @NonNull List<WorkflowHandle<Object, Exception>> backfillSchedule(
867+
@NonNull String scheduleName, @NonNull Instant start, @NonNull Instant end) {
868+
return ensureLaunched("backfillSchedule").backfillSchedule(scheduleName, start, end);
869+
}
870+
871+
// /**
872+
// * Immediately enqueue the scheduled workflow at the current time.
873+
// *
874+
// * @param scheduleName name of an existing schedule
875+
// * @return handle to the enqueued execution
876+
// */
877+
public <T, E extends Exception> @NonNull WorkflowHandle<T, E> triggerSchedule(
878+
@NonNull String scheduleName) {
879+
return ensureLaunched("triggerSchedule").triggerSchedule(scheduleName);
880+
}
881+
762882
/**
763883
* Retrieve a handle to a workflow, given its ID. Note that a handle is always returned, whether
764884
* the workflow exists or not; getStatus() can be used to tell the difference

transact/src/main/java/dev/dbos/transact/DBOSClient.java

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,20 @@
88
import dev.dbos.transact.json.SerializationUtil;
99
import dev.dbos.transact.workflow.ForkOptions;
1010
import dev.dbos.transact.workflow.ListWorkflowsInput;
11+
import dev.dbos.transact.workflow.ScheduleStatus;
1112
import dev.dbos.transact.workflow.SerializationStrategy;
1213
import dev.dbos.transact.workflow.StepInfo;
1314
import dev.dbos.transact.workflow.Timeout;
1415
import dev.dbos.transact.workflow.VersionInfo;
1516
import dev.dbos.transact.workflow.WorkflowHandle;
17+
import dev.dbos.transact.workflow.WorkflowSchedule;
1618
import dev.dbos.transact.workflow.WorkflowState;
1719
import dev.dbos.transact.workflow.WorkflowStatus;
1820
import dev.dbos.transact.workflow.internal.WorkflowStatusInternal;
1921

2022
import java.time.Duration;
2123
import java.time.Instant;
24+
import java.time.ZoneId;
2225
import java.util.List;
2326
import java.util.Map;
2427
import java.util.Objects;
@@ -829,4 +832,119 @@ public void deleteWorkflows(@NonNull List<String> workflowIds, boolean deleteChi
829832
public void setLatestApplicationVersion(@NonNull String versionName) {
830833
systemDatabase.updateApplicationVersionTimestamp(versionName, Instant.now());
831834
}
835+
836+
/**
837+
* Create a cron schedule. The scheduleId is generated if null.
838+
*
839+
* @param schedule the schedule configuration
840+
*/
841+
public void createSchedule(
842+
@NonNull String scheduleName,
843+
@NonNull String workflowName,
844+
@NonNull String className,
845+
@NonNull String schedule,
846+
@Nullable Object context,
847+
boolean backfill,
848+
@Nullable ZoneId cronTimeZone,
849+
@Nullable String queueName) {
850+
DBOSExecutor.createSchedule(
851+
scheduleName,
852+
workflowName,
853+
className,
854+
schedule,
855+
context,
856+
backfill,
857+
cronTimeZone,
858+
queueName,
859+
wfSchedule -> systemDatabase.createSchedule(wfSchedule));
860+
}
861+
862+
/**
863+
* Get a schedule by name.
864+
*
865+
* @param name schedule name
866+
* @return the schedule, or empty if not found
867+
*/
868+
public @NonNull Optional<WorkflowSchedule> getSchedule(@NonNull String name) {
869+
return systemDatabase.getSchedule(name);
870+
}
871+
872+
/**
873+
* List schedules with optional filters.
874+
*
875+
* @param status filter by status; null means no filter
876+
* @param workflowName filter by workflow name; null means no filter
877+
* @param namePrefix filter by schedule name prefix; null means no filter
878+
* @return matching schedules
879+
*/
880+
public @NonNull List<WorkflowSchedule> listSchedules(
881+
@Nullable List<ScheduleStatus> status,
882+
@Nullable List<String> workflowName,
883+
@Nullable List<String> namePrefix) {
884+
return systemDatabase.listSchedules(status, workflowName, namePrefix);
885+
}
886+
887+
/**
888+
* Delete a schedule by name. No-op if the schedule does not exist.
889+
*
890+
* @param name schedule name
891+
*/
892+
public void deleteSchedule(@NonNull String name) {
893+
systemDatabase.deleteSchedule(name);
894+
}
895+
896+
/**
897+
* Pause a schedule. A paused schedule does not fire.
898+
*
899+
* @param name schedule name
900+
*/
901+
public void pauseSchedule(@NonNull String name) {
902+
systemDatabase.pauseSchedule(name);
903+
}
904+
905+
/**
906+
* Resume a paused schedule so it begins firing again.
907+
*
908+
* @param name schedule name
909+
*/
910+
public void resumeSchedule(@NonNull String name) {
911+
systemDatabase.resumeSchedule(name);
912+
}
913+
914+
/**
915+
* Atomically create or replace a set of schedules.
916+
*
917+
* @param schedules the schedules to apply
918+
*/
919+
public void applySchedules(@NonNull List<WorkflowSchedule> schedules) {
920+
systemDatabase.applySchedules(schedules);
921+
}
922+
923+
// /**
924+
// * Enqueue all executions of a schedule that would have run between {@code start} (exclusive)
925+
// and
926+
// * {@code end} (exclusive).
927+
// *
928+
// * @param scheduleName name of an existing schedule
929+
// * @param start start of the backfill window (exclusive)
930+
// * @param end end of the backfill window (exclusive)
931+
// * @return handles to the enqueued executions
932+
// */
933+
public @NonNull List<WorkflowHandle<Object, Exception>> backfillSchedule(
934+
@NonNull String scheduleName, @NonNull Instant start, @NonNull Instant end) {
935+
var ids = DBOSExecutor.backfillSchedule(scheduleName, start, end, systemDatabase, serializer);
936+
return ids.stream().<WorkflowHandle<Object, Exception>>map(this::retrieveWorkflow).toList();
937+
}
938+
939+
// /**
940+
// * Immediately enqueue the scheduled workflow at the current time.
941+
// *
942+
// * @param scheduleName name of an existing schedule
943+
// * @return handle to the enqueued execution
944+
// */
945+
public <T, E extends Exception> @NonNull WorkflowHandle<T, E> triggerSchedule(
946+
@NonNull String scheduleName) {
947+
var id = DBOSExecutor.triggerSchedule(scheduleName, systemDatabase, serializer);
948+
return retrieveWorkflow(id);
949+
}
832950
}

0 commit comments

Comments
 (0)