Skip to content

Commit 97088a9

Browse files
authored
StartWorkflow/EnqueueWorkflow (#25)
1 parent e6e4a0b commit 97088a9

21 files changed

Lines changed: 278 additions & 137 deletions

src/main/java/dev/dbos/transact/DBOS.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@
33
import dev.dbos.transact.config.DBOSConfig;
44
import dev.dbos.transact.database.SystemDatabase;
55
import dev.dbos.transact.execution.DBOSExecutor;
6+
import dev.dbos.transact.execution.WorkflowFunction;
67
import dev.dbos.transact.execution.RecoveryService;
78
import dev.dbos.transact.http.HttpServer;
89
import dev.dbos.transact.http.controllers.AdminController;
910
import dev.dbos.transact.interceptor.AsyncInvocationHandler;
1011
import dev.dbos.transact.interceptor.QueueInvocationHandler;
11-
import dev.dbos.transact.interceptor.TransactInvocationHandler;
1212
import dev.dbos.transact.interceptor.UnifiedInvocationHandler;
1313
import dev.dbos.transact.migrations.MigrationManager;
1414
import dev.dbos.transact.notifications.NotificationService;
@@ -21,7 +21,6 @@
2121
import org.slf4j.Logger;
2222
import org.slf4j.LoggerFactory;
2323

24-
import java.util.concurrent.CountDownLatch;
2524
import java.util.concurrent.atomic.AtomicBoolean;
2625

2726
public class DBOS {
@@ -273,7 +272,7 @@ public void shutdown() {
273272
}
274273
}
275274

276-
public static WorkflowHandle<?> retrieveWorkflow(String workflowId) {
275+
public static <T> WorkflowHandle<T> retrieveWorkflow(String workflowId) {
277276
return DBOS.getInstance().dbosExecutor.retrieveWorkflow(workflowId);
278277
}
279278

@@ -355,7 +354,7 @@ public void sleep(float seconds) {
355354
* @param workflowId id of the workflow
356355
* @return A handle to the workflow
357356
*/
358-
public WorkflowHandle<?> resumeWorkflow(String workflowId) {
357+
public <T> WorkflowHandle<T> resumeWorkflow(String workflowId) {
359358
return this.dbosExecutor.resumeWorkflow(workflowId) ;
360359
}
361360

@@ -380,10 +379,23 @@ public void cancelWorkflow(String workflowId) {
380379
* @param options {@link ForkOptions} containing forkedWorkflowId, applicationVersion, timeout
381380
* @return handle to the workflow
382381
*/
383-
public WorkflowHandle<?> forkWorkflow(String workflowId, int startStep, ForkOptions options) {
382+
public <T> WorkflowHandle<T> forkWorkflow(String workflowId, int startStep, ForkOptions options) {
384383
return this.dbosExecutor.forkWorkflow(workflowId, startStep, options) ;
385384
}
386385

386+
/**
387+
* Start a workflow asynchronously.
388+
* If a queue is specified with DBOSOptions, the workflow is queued.
389+
*
390+
* @param func A function annotated with @Workflow
391+
* @return handle {@link WorkflowHandle} to the workflow
392+
* @param <T> type returned by the function
393+
*/
394+
395+
public <T> WorkflowHandle<T> startWorkflow(WorkflowFunction<T> func) {
396+
return this.dbosExecutor.startWorkflow(func) ;
397+
}
398+
387399

388400
}
389401

src/main/java/dev/dbos/transact/context/DBOSContext.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,5 +163,17 @@ public Queue getQueue() {
163163
public long getWorkflowTimeoutMs() {
164164
return workflowTimeoutMs ;
165165
}
166+
167+
public DBOSContext copyWithAsync() {
168+
return new DBOSContext(workflowId, functionId, parentWorkflowId, parentFunctionId, inWorkflow, true, queue, workflowTimeoutMs);
169+
}
170+
171+
public DBOSContext copyWithQueue(Queue q) {
172+
return new DBOSContext(workflowId, functionId, parentWorkflowId, parentFunctionId, inWorkflow, async, q, workflowTimeoutMs);
173+
}
174+
175+
public DBOSContext copyWithWorkflowId(String id) {
176+
return new DBOSContext(id, functionId, parentWorkflowId, parentFunctionId, inWorkflow, async, queue, workflowTimeoutMs);
177+
}
166178
}
167179

src/main/java/dev/dbos/transact/context/DBOSOptions.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,10 @@ public Builder(String workflowId) {
4646
this.workflowId = workflowId;
4747
}
4848

49-
public Builder async() {
49+
/* public Builder async() {
5050
this.async = true;
5151
return this;
52-
}
52+
} */
5353

5454
public Builder queue(Queue queue) {
5555
this.queue = queue;

src/main/java/dev/dbos/transact/execution/DBOSExecutor.java

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import dev.dbos.transact.exceptions.*;
1111
import dev.dbos.transact.json.JSONUtil;
1212
import dev.dbos.transact.queue.Queue;
13-
import dev.dbos.transact.queue.QueueRegistry;
1413
import dev.dbos.transact.queue.QueueService;
1514
import dev.dbos.transact.workflow.ForkOptions;
1615
import dev.dbos.transact.workflow.WorkflowHandle;
@@ -25,7 +24,6 @@
2524

2625
import java.lang.reflect.InvocationTargetException;
2726
import java.lang.reflect.Method;
28-
import java.sql.SQLException;
2927
import java.util.Optional;
3028
import java.util.UUID;
3129
import java.util.concurrent.*;
@@ -150,7 +148,7 @@ public <T> T syncWorkflow(String workflowName,
150148
String targetClassName,
151149
Object target,
152150
Object[] args,
153-
WorkflowFunction function,
151+
WorkflowFunctionReflect function,
154152
String workflowId) throws Throwable {
155153

156154
String wfid = workflowId ;
@@ -213,7 +211,7 @@ public <T> T syncWorkflow(String workflowName,
213211
<T> T runAndSaveResult(
214212
Object target,
215213
Object[] args,
216-
WorkflowFunction function,
214+
WorkflowFunctionReflect function,
217215
String workflowId) throws Throwable {
218216

219217
try {
@@ -249,7 +247,7 @@ public <T> WorkflowHandle<T> submitWorkflow(String workflowName,
249247
String targetClassName,
250248
Object target,
251249
Object[] args,
252-
WorkflowFunction function) throws Throwable {
250+
WorkflowFunctionReflect function) throws Throwable {
253251

254252
DBOSContext ctx = DBOSContextHolder.get();
255253
String workflowId = ctx.getWorkflowId() ;
@@ -332,7 +330,6 @@ public void enqueueWorkflow(String workflowName,
332330
) throws Throwable {
333331

334332

335-
336333
DBOSContext ctx = DBOSContextHolder.get();
337334
String wfid = ctx.getWorkflowId() ;
338335

@@ -363,7 +360,7 @@ public <T> T runStep(String stepName,
363360
int maxAttempts,
364361
float backOffRate,
365362
Object[] args,
366-
DBOSFunction<T> function
363+
WorkflowFunction<T> function
367364
) throws Throwable {
368365

369366

@@ -435,7 +432,7 @@ public <T> T runStep(String stepName,
435432
* Retrieve the workflowHandle for the workflowId
436433
*
437434
*/
438-
public WorkflowHandle<?> retrieveWorkflow(String workflowId) {
435+
public <R> WorkflowHandle<R> retrieveWorkflow(String workflowId) {
439436
return new WorkflowHandleDBPoll(workflowId, systemDatabase) ;
440437
}
441438

@@ -488,7 +485,7 @@ public void sleep(float seconds) {
488485

489486
}
490487

491-
public WorkflowHandle<?> resumeWorkflow(String workflowId) {
488+
public <T> WorkflowHandle<T> resumeWorkflow(String workflowId) {
492489

493490
Supplier<Void> resumeFunction = () -> {
494491
logger.info("Resuming workflow: ", workflowId);
@@ -512,7 +509,7 @@ public void cancelWorkflow(String workflowId) {
512509

513510
}
514511

515-
public WorkflowHandle<?> forkWorkflow(String workflowId, int startStep, ForkOptions options) {
512+
public <T> WorkflowHandle<T> forkWorkflow(String workflowId, int startStep, ForkOptions options) {
516513

517514
Supplier<String> forkFunction = () -> {
518515
logger.info(String.format("Forking workflow:%s from step:%d ", workflowId, startStep));
@@ -523,4 +520,29 @@ public WorkflowHandle<?> forkWorkflow(String workflowId, int startStep, ForkOpti
523520
String forkedId = systemDatabase.callFunctionAsStep(forkFunction, "DBOS.forkedWorkflow");
524521
return retrieveWorkflow(forkedId);
525522
}
523+
524+
public <T> WorkflowHandle<T> startWorkflow(WorkflowFunction<T> func) {
525+
DBOSContext oldctx = DBOSContextHolder.get();
526+
DBOSContext newCtx = oldctx ;
527+
528+
if (newCtx.getWorkflowId() == null) {
529+
newCtx = newCtx.copyWithWorkflowId(UUID.randomUUID().toString()) ;
530+
}
531+
532+
if (newCtx.getQueue() == null) {
533+
newCtx = oldctx.copyWithAsync() ;
534+
}
535+
536+
try {
537+
DBOSContextHolder.set(newCtx);
538+
func.execute();
539+
return retrieveWorkflow(newCtx.getWorkflowId());
540+
} catch(Throwable t) {
541+
throw new DBOSException(UNEXPECTED.getCode(), t.getMessage());
542+
} finally {
543+
DBOSContextHolder.set(oldctx);
544+
}
545+
546+
}
547+
526548
}

src/main/java/dev/dbos/transact/execution/DBOSFunction.java

Lines changed: 0 additions & 6 deletions
This file was deleted.
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package dev.dbos.transact.execution;
22

33
@FunctionalInterface
4-
public interface WorkflowFunction {
5-
Object invoke(Object target, Object[] args) throws Exception;
4+
public interface WorkflowFunction<T> {
5+
T execute() throws Throwable;
66
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package dev.dbos.transact.execution;
2+
3+
@FunctionalInterface
4+
public interface WorkflowFunction0<R> {
5+
R run() ;
6+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package dev.dbos.transact.execution;
2+
3+
@FunctionalInterface
4+
public interface WorkflowFunction1<T1, R> {
5+
R run(T1 arg1);
6+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package dev.dbos.transact.execution;
2+
3+
@FunctionalInterface
4+
public interface WorkflowFunction2<T1, T2, R> {
5+
R run(T1 arg1, T2 arg2);
6+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package dev.dbos.transact.execution;
2+
3+
@FunctionalInterface
4+
public interface WorkflowFunctionReflect {
5+
Object invoke(Object target, Object[] args) throws Exception;
6+
}

0 commit comments

Comments
 (0)