Skip to content

Commit cc601d6

Browse files
authored
Event receiver infra and scheduler improvements (#200)
**Infrastructure for pluggable event receivers** Add lifecycle listeners and workflow execution entrypoints. Allow deadline to be specified in start options (consolidates code also) Test for same. **Scheduler improvements:** Make scheduler less special (uses event receivers except for its validator) Use user-specified queues. Add mode where missed executions are not ignored, but made up Fix scheduler's key calculation to be on second boundaries (as it should be) Check all things in one test, rather than one per run, saving over 10 seconds of test time
1 parent 1b2d23c commit cc601d6

25 files changed

Lines changed: 814 additions & 196 deletions

transact/src/main/java/dev/dbos/transact/DBOS.java

Lines changed: 71 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
import dev.dbos.transact.context.DBOSContextHolder;
66
import dev.dbos.transact.database.ExternalState;
77
import dev.dbos.transact.execution.DBOSExecutor;
8+
import dev.dbos.transact.execution.DBOSLifecycleListener;
9+
import dev.dbos.transact.execution.RegisteredWorkflow;
10+
import dev.dbos.transact.execution.RegisteredWorkflowInstance;
811
import dev.dbos.transact.execution.ThrowingRunnable;
912
import dev.dbos.transact.execution.ThrowingSupplier;
1013
import dev.dbos.transact.internal.DBOSInvocationHandler;
@@ -17,9 +20,13 @@
1720

1821
import java.lang.reflect.Method;
1922
import java.time.Duration;
23+
import java.util.Collection;
24+
import java.util.HashSet;
2025
import java.util.List;
2126
import java.util.Objects;
2227
import java.util.Optional;
28+
import java.util.Set;
29+
import java.util.concurrent.ConcurrentHashMap;
2330
import java.util.concurrent.atomic.AtomicReference;
2431

2532
import org.slf4j.Logger;
@@ -38,6 +45,7 @@ private DBOS() {}
3845
public static class Instance {
3946
private final WorkflowRegistry workflowRegistry = new WorkflowRegistry();
4047
private final QueueRegistry queueRegistry = new QueueRegistry();
48+
private final Set<DBOSLifecycleListener> lifecycleRegistry = ConcurrentHashMap.newKeySet();
4149

4250
private DBOSConfig config;
4351

@@ -61,33 +69,40 @@ private void registerClassWorkflows(
6169
throw new IllegalStateException("Cannot register workflow after DBOS is launched");
6270
}
6371

72+
String className = implementation.getClass().getName();
73+
workflowRegistry.register(interfaceClass, implementation, className, instanceName);
74+
6475
Method[] methods = implementation.getClass().getDeclaredMethods();
6576
for (Method method : methods) {
6677
Workflow wfAnnotation = method.getAnnotation(Workflow.class);
6778
if (wfAnnotation != null) {
6879
method.setAccessible(true); // In case it's not public
69-
registerWorkflowMethod(wfAnnotation, implementation, instanceName, method);
80+
registerWorkflowMethod(wfAnnotation, implementation, className, instanceName, method);
7081
}
7182
}
7283
}
7384

7485
private String registerWorkflowMethod(
75-
Workflow wfTag, Object target, String instanceName, Method method) {
86+
Workflow wfTag, Object target, String className, String instanceName, Method method) {
7687
if (dbosExecutor.get() != null) {
7788
throw new IllegalStateException("Cannot register workflow after DBOS is launched");
7889
}
7990

8091
String name = wfTag.name().isEmpty() ? method.getName() : wfTag.name();
8192
workflowRegistry.register(
82-
target.getClass().getName(),
83-
name,
84-
target,
85-
instanceName,
86-
method,
87-
wfTag.maxRecoveryAttempts());
93+
className, name, target, instanceName, method, wfTag.maxRecoveryAttempts());
8894
return name;
8995
}
9096

97+
void registerLifecycleListener(DBOSLifecycleListener l) {
98+
if (dbosExecutor.get() != null) {
99+
throw new IllegalStateException(
100+
"Cannot register lifecycle listener after DBOS is launched");
101+
}
102+
103+
lifecycleRegistry.add(l);
104+
}
105+
91106
void registerQueue(Queue queue) {
92107
if (dbosExecutor.get() != null) {
93108
throw new IllegalStateException("Cannot build a queue after DBOS is launched");
@@ -117,6 +132,7 @@ private void registerInternals() {
117132
void clearRegistry() {
118133
workflowRegistry.clear();
119134
queueRegistry.clear();
135+
lifecycleRegistry.clear();
120136

121137
registerInternals();
122138
}
@@ -153,7 +169,12 @@ public void launch() {
153169
var executor = new DBOSExecutor(config);
154170

155171
if (dbosExecutor.compareAndSet(null, executor)) {
156-
executor.start(this, workflowRegistry.getSnapshot(), queueRegistry.getSnapshot());
172+
executor.start(
173+
this,
174+
new HashSet<DBOSLifecycleListener>(this.lifecycleRegistry),
175+
workflowRegistry.getWorkflowSnapshot(),
176+
workflowRegistry.getInstanceSnapshot(),
177+
queueRegistry.getSnapshot());
157178
}
158179
}
159180
}
@@ -211,6 +232,15 @@ public static Queue registerQueue(Queue queue) {
211232
return queue;
212233
}
213234

235+
/**
236+
* Register a lifecycle listener that receives callbacks when DBOS is launched or shut down
237+
*
238+
* @param listener
239+
*/
240+
public static void registerLifecycleListener(DBOSLifecycleListener listener) {
241+
ensureInstance().registerLifecycleListener(listener);
242+
}
243+
214244
/**
215245
* Reinitializes the singleton instance of DBOS with config. For use in tests that reinitialize
216246
* DBOS @DBOSConfig config dbos configuration
@@ -620,6 +650,38 @@ public static List<StepInfo> listWorkflowSteps(String workflowId) {
620650
return executor("listWorkflowSteps").listWorkflowSteps(workflowId);
621651
}
622652

653+
/**
654+
* Get all workflows registered with DBOS.
655+
*
656+
* @return list of all registered workflow methods
657+
*/
658+
public static Collection<RegisteredWorkflow> getRegisteredWorkflows() {
659+
return executor("getRegisteredWorkflows").getWorkflows();
660+
}
661+
662+
/**
663+
* Get all workflow classes registered with DBOS.
664+
*
665+
* @return list of all class instances containing registered workflow methods
666+
*/
667+
public static Collection<RegisteredWorkflowInstance> getRegisteredWorkflowInstances() {
668+
return executor("getRegisteredWorkflowInstances").getInstances();
669+
}
670+
671+
/**
672+
* Execute a workflow based on registration and arguments. This is expected to be used by generic
673+
* callers, not app code.
674+
*
675+
* @param regWorkflow Registration of the workflow. @see getRegisteredWorkflows
676+
* @param args Workflow function arguments
677+
* @param options Execution options, such as ID, queue, and timeout/deadline
678+
* @return WorkflowHandle to the executed workflow
679+
*/
680+
public static WorkflowHandle<?, ?> startWorkflow(
681+
RegisteredWorkflow regWorkflow, Object[] args, StartWorkflowOptions options) {
682+
return executor("executeWorkflow").executeWorkflow(regWorkflow, args, options, null, null);
683+
}
684+
623685
/**
624686
* Get a system database record stored by an external service A unique value is stored per
625687
* combination of service, workflowName, and key

transact/src/main/java/dev/dbos/transact/DBOSClient.java

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,17 @@
22

33
import dev.dbos.transact.database.SystemDatabase;
44
import dev.dbos.transact.execution.DBOSExecutor;
5-
import dev.dbos.transact.execution.DBOSExecutor.ExecuteWorkflowOptions;
65
import dev.dbos.transact.workflow.ForkOptions;
76
import dev.dbos.transact.workflow.ListWorkflowsInput;
87
import dev.dbos.transact.workflow.StepInfo;
8+
import dev.dbos.transact.workflow.Timeout;
99
import dev.dbos.transact.workflow.WorkflowHandle;
1010
import dev.dbos.transact.workflow.WorkflowState;
1111
import dev.dbos.transact.workflow.WorkflowStatus;
1212
import dev.dbos.transact.workflow.internal.WorkflowStatusInternal;
1313

1414
import java.time.Duration;
15+
import java.time.Instant;
1516
import java.util.List;
1617
import java.util.Objects;
1718
import java.util.Optional;
@@ -89,6 +90,7 @@ public record EnqueueOptions(
8990
String workflowId,
9091
String appVersion,
9192
Duration timeout,
93+
Instant deadline,
9294
String deduplicationId,
9395
Integer priority) {
9496

@@ -117,7 +119,7 @@ public record EnqueueOptions(
117119

118120
/** Construct `EnqueueOptions` with a minimum set of required options */
119121
public EnqueueOptions(String className, String workflowName, String queueName) {
120-
this(workflowName, queueName, className, "", null, null, null, null, null);
122+
this(workflowName, queueName, className, "", null, null, null, null, null, null);
121123
}
122124

123125
/**
@@ -135,6 +137,7 @@ public EnqueueOptions withClassName(String className) {
135137
this.workflowId,
136138
this.appVersion,
137139
this.timeout,
140+
this.deadline,
138141
this.deduplicationId,
139142
this.priority);
140143
}
@@ -155,6 +158,7 @@ public EnqueueOptions withWorkflowId(String workflowId) {
155158
workflowId,
156159
this.appVersion,
157160
this.timeout,
161+
this.deadline,
158162
this.deduplicationId,
159163
this.priority);
160164
}
@@ -175,6 +179,7 @@ public EnqueueOptions withAppVersion(String appVersion) {
175179
this.workflowId,
176180
appVersion,
177181
this.timeout,
182+
this.deadline,
178183
this.deduplicationId,
179184
this.priority);
180185
}
@@ -195,6 +200,28 @@ public EnqueueOptions withTimeout(Duration timeout) {
195200
this.workflowId,
196201
this.appVersion,
197202
timeout,
203+
this.deadline,
204+
this.deduplicationId,
205+
this.priority);
206+
}
207+
208+
/**
209+
* Specify a deadline for the workflow. This is an absolute time, regardless of when the
210+
* workflow starts.
211+
*
212+
* @param deadline Instant after which the workflow will be canceled.
213+
* @return New `EnqueueOptions` with the deadline set
214+
*/
215+
public EnqueueOptions withDeadline(Instant deadline) {
216+
return new EnqueueOptions(
217+
this.workflowName,
218+
this.queueName,
219+
this.className,
220+
this.instanceName,
221+
this.workflowId,
222+
this.appVersion,
223+
this.timeout,
224+
deadline,
198225
this.deduplicationId,
199226
this.priority);
200227
}
@@ -215,6 +242,7 @@ public EnqueueOptions withDeduplicationId(String deduplicationId) {
215242
this.workflowId,
216243
this.appVersion,
217244
this.timeout,
245+
this.deadline,
218246
deduplicationId,
219247
this.priority);
220248
}
@@ -235,6 +263,7 @@ public EnqueueOptions withInstanceName(String instName) {
235263
this.workflowId,
236264
this.appVersion,
237265
this.timeout,
266+
this.deadline,
238267
this.deduplicationId,
239268
this.priority);
240269
}
@@ -254,6 +283,7 @@ public EnqueueOptions withPriority(Integer priority) {
254283
this.workflowId,
255284
this.appVersion,
256285
this.timeout,
286+
this.deadline,
257287
this.deduplicationId,
258288
priority);
259289
}
@@ -288,14 +318,14 @@ public <T, E extends Exception> WorkflowHandle<T, E> enqueueWorkflow(
288318
Objects.requireNonNullElse(options.instanceName(), ""),
289319
null,
290320
args,
291-
new ExecuteWorkflowOptions(
321+
new StartWorkflowOptions(
292322
Objects.requireNonNullElseGet(options.workflowId(), () -> UUID.randomUUID().toString()),
293-
options.timeout(),
294-
null,
323+
Timeout.of(options.timeout()),
295324
Objects.requireNonNull(
296325
options.queueName(), "EnqueueOptions queueName must not be null"),
297326
options.deduplicationId,
298-
options.priority),
327+
options.priority,
328+
options.deadline),
299329
null,
300330
null,
301331
options.appVersion,

0 commit comments

Comments
 (0)