Skip to content

Commit cb957eb

Browse files
authored
Update WF registration API (#382)
* Adds DBOSIntegration.registerWorkflow overload that takes direct parameters instead of @workflow annotation instance * **BREAKING CHANGE**: changed DBOSIntegration.registerWorkflow overloads to return the registered workflow * Modifled DBOSIntegration getRegisteredWorkflows/getRegisteredWorkflow/getRegisteredWorkflowInstances to work pre and post launch * **BREAKING CHANGE**: removed DBOS.registerWorkflow method. Call DBOSIntegration.registerWorkflow instead fixes #381
1 parent 831a6d4 commit cb957eb

4 files changed

Lines changed: 119 additions & 65 deletions

File tree

transact/src/main/java/dev/dbos/transact/DBOS.java

Lines changed: 2 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828

2929
import java.io.IOException;
3030
import java.io.InputStream;
31-
import java.lang.reflect.Method;
3231
import java.time.Duration;
3332
import java.time.Instant;
3433
import java.util.Arrays;
@@ -84,10 +83,7 @@ public DBOS(@NonNull DBOSConfig config) {
8483
this.config = new DBOSConfig(config);
8584
this.integration =
8685
new DBOSIntegration(
87-
this.config,
88-
dbosExecutor::get,
89-
this::registerLifecycleListener,
90-
this::registerWorkflow);
86+
this.config, this.workflowRegistry, dbosExecutor::get, this::registerLifecycleListener);
9187
}
9288

9389
/**
@@ -210,36 +206,14 @@ public void registerQueues(@NonNull Queue... queues) {
210206
var wfTag = method.getAnnotation(Workflow.class);
211207
if (wfTag != null) {
212208
method.setAccessible(true); // In case it's not public
213-
workflowRegistry.registerWorkflow(wfTag, target, method, instanceName);
209+
integration.registerWorkflow(wfTag, target, method, instanceName);
214210
}
215211
}
216212

217213
return DBOSInvocationHandler.createProxy(
218214
interfaceClass, target, instanceName, dbosExecutor::get);
219215
}
220216

221-
/**
222-
* Register a workflow method with DBOS. This method is used internally by the proxy registration
223-
* process and should not typically be called directly by application code.
224-
*
225-
* @param wfTag the Workflow annotation containing workflow configuration
226-
* @param target the object instance containing the workflow method
227-
* @param method the Method representing the workflow function
228-
* @param instanceName optional instance name for the workflow (can be null)
229-
* @throws IllegalStateException if called after DBOS is launched
230-
*/
231-
private void registerWorkflow(
232-
@NonNull Workflow wfTag,
233-
@NonNull Object target,
234-
@NonNull Method method,
235-
@Nullable String instanceName) {
236-
if (dbosExecutor.get() != null) {
237-
throw new IllegalStateException("Cannot register workflow after DBOS is launched");
238-
}
239-
240-
workflowRegistry.registerWorkflow(wfTag, target, method, instanceName);
241-
}
242-
243217
/**
244218
* Check if the provided target object contains any methods annotated with @Workflow.
245219
*

transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@
5555
import java.time.ZoneOffset;
5656
import java.util.ArrayList;
5757
import java.util.Collection;
58-
import java.util.Collections;
5958
import java.util.Iterator;
6059
import java.util.List;
6160
import java.util.Map;
@@ -169,8 +168,8 @@ public void start(
169168
if (isRunning.compareAndSet(false, true)) {
170169
logger.info("DBOS Executor starting");
171170

172-
this.workflowMap = Collections.unmodifiableMap(workflowMap);
173-
this.instanceMap = Collections.unmodifiableMap(instanceMap);
171+
this.workflowMap = workflowMap;
172+
this.instanceMap = instanceMap;
174173
this.queueMap =
175174
queues.stream().collect(Collectors.toUnmodifiableMap(Queue::name, queue -> queue));
176175
this.listeners = listenerSet;

transact/src/main/java/dev/dbos/transact/internal/DBOSIntegration.java

Lines changed: 104 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@
77
import dev.dbos.transact.execution.DBOSLifecycleListener;
88
import dev.dbos.transact.execution.RegisteredWorkflow;
99
import dev.dbos.transact.execution.RegisteredWorkflowInstance;
10+
import dev.dbos.transact.workflow.SerializationStrategy;
1011
import dev.dbos.transact.workflow.Workflow;
1112
import dev.dbos.transact.workflow.WorkflowHandle;
1213

1314
import java.lang.reflect.Method;
1415
import java.util.Collection;
16+
import java.util.Collections;
1517
import java.util.Objects;
1618
import java.util.Optional;
1719
import java.util.function.Consumer;
@@ -31,25 +33,30 @@
3133
*/
3234
public class DBOSIntegration {
3335

36+
/**
37+
* Callback used during workflow registration to process each discovered workflow method.
38+
* Implementations receive the {@link Workflow} annotation, the target object, the reflective
39+
* {@link Method}, and the optional instance name.
40+
*/
3441
@FunctionalInterface
3542
public interface RegisteredWorkflowConsumer {
3643
void register(Workflow wfTag, Object target, Method method, String instanceName);
3744
}
3845

3946
private final DBOSConfig config;
47+
private final WorkflowRegistry workflowRegistry;
4048
private final Supplier<DBOSExecutor> executorSupplier;
4149
private final Consumer<DBOSLifecycleListener> listenerConsumer;
42-
private final RegisteredWorkflowConsumer workflowConsumer;
4350

4451
public DBOSIntegration(
4552
@NonNull DBOSConfig config,
53+
@NonNull WorkflowRegistry workflowRegistry,
4654
@NonNull Supplier<DBOSExecutor> executorSupplier,
47-
@NonNull Consumer<DBOSLifecycleListener> lifecycleConsumer,
48-
@NonNull RegisteredWorkflowConsumer workflowConsumer) {
55+
@NonNull Consumer<DBOSLifecycleListener> lifecycleConsumer) {
4956
this.config = Objects.requireNonNull(config);
57+
this.workflowRegistry = Objects.requireNonNull(workflowRegistry);
5058
this.executorSupplier = Objects.requireNonNull(executorSupplier);
5159
this.listenerConsumer = Objects.requireNonNull(lifecycleConsumer);
52-
this.workflowConsumer = Objects.requireNonNull(workflowConsumer);
5360
}
5461

5562
private DBOSExecutor executor(String caller) {
@@ -61,14 +68,19 @@ private DBOSExecutor executor(String caller) {
6168
return exec;
6269
}
6370

71+
/**
72+
* Returns the DBOS configuration supplied at construction time.
73+
*
74+
* @return the active {@link DBOSConfig}
75+
*/
6476
public DBOSConfig config() {
6577
return this.config;
6678
}
6779

6880
/**
69-
* Register a lifecycle listener that receives callbacks when DBOS is launched or shut down
81+
* Register a lifecycle listener that receives callbacks when DBOS is launched or shut down.
7082
*
71-
* @param listener
83+
* @param listener the listener to register; must not be {@code null}
7284
*/
7385
public void registerLifecycleListener(@NonNull DBOSLifecycleListener listener) {
7486
listenerConsumer.accept(listener);
@@ -84,12 +96,61 @@ public void registerLifecycleListener(@NonNull DBOSLifecycleListener listener) {
8496
* @param instanceName optional instance name for the workflow (can be null)
8597
* @throws IllegalStateException if called after DBOS is launched
8698
*/
87-
public void registerWorkflow(
99+
public RegisteredWorkflow registerWorkflow(
88100
@NonNull Workflow wfTag,
89101
@NonNull Object target,
90102
@NonNull Method method,
91103
@Nullable String instanceName) {
92-
workflowConsumer.register(wfTag, target, method, instanceName);
104+
105+
var workflowName = WorkflowRegistry.getWorkflowName(wfTag, method);
106+
var className = WorkflowRegistry.getWorkflowClassName(target);
107+
108+
return registerWorkflow(
109+
workflowName,
110+
className,
111+
instanceName,
112+
target,
113+
method,
114+
wfTag.maxRecoveryAttempts(),
115+
wfTag.serializationStrategy());
116+
}
117+
118+
/**
119+
* Register a workflow method with DBOS using explicit field values rather than deriving them from
120+
* a {@link Workflow} annotation. Prefer {@link #registerWorkflow(Workflow, Object, Method,
121+
* String)} unless you need to supply names or options that differ from the annotation.
122+
*
123+
* @param workflowName logical name of the workflow
124+
* @param className name of the class that declares the workflow method
125+
* @param instanceName optional instance name distinguishing multiple registrations of the same
126+
* class; may be {@code null}
127+
* @param target the object instance on which the method will be invoked
128+
* @param method the workflow {@link Method}
129+
* @param maxRecoveryAttempts maximum number of recovery attempts; {@code null} uses the default
130+
* @param serializationStrategy strategy used to serialize and deserialize workflow arguments and
131+
* return values; {@code null} uses the default
132+
* @throws IllegalStateException if called after DBOS is launched
133+
*/
134+
public RegisteredWorkflow registerWorkflow(
135+
@NonNull String workflowName,
136+
@NonNull String className,
137+
@Nullable String instanceName,
138+
@NonNull Object target,
139+
@NonNull Method method,
140+
@Nullable Integer maxRecoveryAttempts,
141+
@Nullable SerializationStrategy serializationStrategy) {
142+
if (executorSupplier.get() != null) {
143+
throw new IllegalStateException("Cannot register workflow after DBOS is launched");
144+
}
145+
146+
return workflowRegistry.registerWorkflow(
147+
workflowName,
148+
className,
149+
instanceName,
150+
target,
151+
method,
152+
maxRecoveryAttempts,
153+
serializationStrategy);
93154
}
94155

95156
/**
@@ -137,7 +198,11 @@ public Object runWorkflow(
137198
* @return list of all registered workflow methods
138199
*/
139200
public @NonNull Collection<RegisteredWorkflow> getRegisteredWorkflows() {
140-
return executor("getRegisteredWorkflows").getRegisteredWorkflows();
201+
var executor = executorSupplier.get();
202+
if (executor != null) {
203+
return executor.getRegisteredWorkflows();
204+
}
205+
return Collections.unmodifiableCollection(workflowRegistry.getWorkflowSnapshot().values());
141206
}
142207

143208
/**
@@ -146,15 +211,21 @@ public Object runWorkflow(
146211
* @return list of all class instances containing registered workflow methods
147212
*/
148213
public @NonNull Collection<RegisteredWorkflowInstance> getRegisteredWorkflowInstances() {
149-
return executor("getRegisteredWorkflowInstances").getRegisteredWorkflowInstances();
214+
var executor = executorSupplier.get();
215+
if (executor != null) {
216+
return executor.getRegisteredWorkflowInstances();
217+
}
218+
return Collections.unmodifiableCollection(workflowRegistry.getInstanceSnapshot().values());
150219
}
151220

152221
/**
153-
* Finds a registered workflow by its workflow name, class name, and instance name.
222+
* Finds a registered workflow by its workflow name and class name, using the default (empty)
223+
* instance name. Equivalent to calling {@link #getRegisteredWorkflow(String, String, String)}
224+
* with an empty string.
154225
*
155226
* @param workflowName the name of the workflow
156227
* @param className the name of the class containing the workflow
157-
* @return an Optional containing the RegisteredWorkflow if found, otherwise empty
228+
* @return an {@link Optional} containing the {@link RegisteredWorkflow} if found, otherwise empty
158229
*/
159230
public Optional<RegisteredWorkflow> getRegisteredWorkflow(
160231
@NonNull String workflowName, @NonNull String className) {
@@ -171,31 +242,38 @@ public Optional<RegisteredWorkflow> getRegisteredWorkflow(
171242
*/
172243
public Optional<RegisteredWorkflow> getRegisteredWorkflow(
173244
@NonNull String workflowName, @NonNull String className, @NonNull String instanceName) {
174-
return executor("getRegisteredWorkflow")
175-
.getRegisteredWorkflow(workflowName, className, instanceName);
245+
var executor = executorSupplier.get();
246+
if (executor != null) {
247+
return executor.getRegisteredWorkflow(workflowName, className, instanceName);
248+
}
249+
var fqName = RegisteredWorkflow.fullyQualifiedName(workflowName, className, instanceName);
250+
return Optional.ofNullable(workflowRegistry.getWorkflowSnapshot().get(fqName));
176251
}
177252

178253
/**
179-
* Get a system database record stored by an external service A unique value is stored per
180-
* combination of service, workflowName, and key
254+
* Get a system database record stored by an external service. A unique value is stored per
255+
* combination of service, workflowName, and key.
181256
*
182-
* @param service Identity of the service maintaining the record
183-
* @param workflowName Fully qualified name of the workflow
184-
* @param key Key assigned within the service+workflow
185-
* @return Optional containing the value associated with the service+workflow+key combination, or
186-
* empty if not found
257+
* @param service identity of the service maintaining the record
258+
* @param workflowName fully qualified name of the workflow
259+
* @param key key assigned within the service+workflow scope
260+
* @return an {@link Optional} containing the value associated with the service+workflow+key
261+
* combination, or empty if not found
262+
* @throws IllegalStateException if DBOS has not been launched
187263
*/
188264
public Optional<ExternalState> getExternalState(String service, String workflowName, String key) {
189265
return executor("getExternalState").getExternalState(service, workflowName, key);
190266
}
191267

192268
/**
193-
* Insert or update a system database record stored by an external service A timestamped unique
194-
* value is stored per combination of service, workflowName, and key
269+
* Insert or update a system database record stored by an external service. A timestamped unique
270+
* value is stored per combination of service, workflowName, and key.
195271
*
196-
* @param state ExternalState containing the service, workflow, key, and value to store
197-
* @return Value associated with the service+workflow+key combination, in case the stored value
198-
* already had a higher version or timestamp
272+
* @param state the {@link ExternalState} containing the service, workflow, key, and value to
273+
* store
274+
* @return the value associated with the service+workflow+key combination — may differ from the
275+
* supplied value if the existing record already had a higher version or timestamp
276+
* @throws IllegalStateException if DBOS has not been launched
199277
*/
200278
public ExternalState upsertExternalState(ExternalState state) {
201279
return executor("upsertExternalState").upsertExternalState(state);

transact/src/main/java/dev/dbos/transact/internal/WorkflowRegistry.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import dev.dbos.transact.execution.RegisteredWorkflow;
44
import dev.dbos.transact.execution.RegisteredWorkflowInstance;
5+
import dev.dbos.transact.workflow.SerializationStrategy;
56
import dev.dbos.transact.workflow.Workflow;
67
import dev.dbos.transact.workflow.WorkflowClassName;
78

@@ -44,29 +45,31 @@ public void registerInstance(@Nullable String instanceName, @NonNull Object targ
4445
}
4546
}
4647

47-
public void registerWorkflow(
48-
@NonNull Workflow wfTag,
48+
public RegisteredWorkflow registerWorkflow(
49+
@NonNull String workflowName,
50+
@NonNull String className,
51+
@Nullable String instanceName,
4952
@NonNull Object target,
5053
@NonNull Method method,
51-
@Nullable String instanceName) {
52-
53-
var workflowName = getWorkflowName(wfTag, method);
54-
var className = getWorkflowClassName(target);
54+
@Nullable Integer maxRecoveryAttempts,
55+
@Nullable SerializationStrategy serializationStrategy) {
5556
var fqName = RegisteredWorkflow.fullyQualifiedName(workflowName, className, instanceName);
57+
5658
var regWorkflow =
5759
new RegisteredWorkflow(
5860
workflowName,
5961
className,
6062
instanceName,
6163
target,
6264
method,
63-
wfTag.maxRecoveryAttempts(),
64-
wfTag.serializationStrategy());
65+
Objects.requireNonNullElse(maxRecoveryAttempts, -1),
66+
Objects.requireNonNullElse(serializationStrategy, SerializationStrategy.DEFAULT));
6567

6668
var previous = wfRegistry.putIfAbsent(fqName, regWorkflow);
6769
if (previous != null) {
6870
throw new IllegalStateException("Workflow already registered with name: " + fqName);
6971
}
72+
return regWorkflow;
7073
}
7174

7275
public Map<String, RegisteredWorkflow> getWorkflowSnapshot() {

0 commit comments

Comments
 (0)