Skip to content

Commit 8f953df

Browse files
committed
Forkoptions
1 parent 582dc4e commit 8f953df

6 files changed

Lines changed: 113 additions & 52 deletions

File tree

src/main/java/dev/dbos/transact/DBOS.java

Lines changed: 4 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import dev.dbos.transact.queue.QueueService;
1515
import dev.dbos.transact.queue.RateLimit;
1616
import dev.dbos.transact.scheduled.SchedulerService;
17+
import dev.dbos.transact.workflow.ForkOptions;
1718
import dev.dbos.transact.workflow.WorkflowHandle;
1819
import org.slf4j.Logger;
1920
import org.slf4j.LoggerFactory;
@@ -371,40 +372,12 @@ public void cancelWorkflow(String workflowId) {
371372
* step are copied over
372373
*
373374
* @param workflowId Original workflow Id
374-
* @param forkedWorkflowId Id of the new workflow
375375
* @param startStep Start execution from this step. Prior steps copied over
376-
* @param applicationVersion The version of the application to run
376+
* @param options Forkoptions forkedWorkflowId, applicationVersion, timeout
377377
* @return handle to the workflow
378378
*/
379-
public WorkflowHandle<?> forkWorkflow(String workflowId, String forkedWorkflowId, int startStep, String applicationVersion) {
380-
return this.dbosExecutor.forkWorkflow(workflowId, forkedWorkflowId, startStep, applicationVersion) ;
381-
}
382-
383-
/**
384-
* Fork the workflow. Re-execute with another Id from the step provided. Steps prior to the provided
385-
* step are copied over
386-
*
387-
* @param workflowId Original workflow Id
388-
* @param startStep Start execution from this step. Prior steps copied over
389-
* @param applicationVersion The version of the application to run
390-
* @return handle to the workflow
391-
*/
392-
public WorkflowHandle<?> forkWorkflow(String workflowId, int startStep, String applicationVersion) {
393-
return this.dbosExecutor.forkWorkflow(workflowId, null, startStep, applicationVersion) ;
394-
}
395-
396-
/**
397-
*
398-
* Fork the workflow. Re-execute with another Id from the step provided. Steps prior to the provided
399-
* step are copied over
400-
*
401-
* @param workflowId Original workflow Id
402-
* @param startStep Start execution from this step. Prior steps copied over
403-
* @return handle to the workflow
404-
*/
405-
406-
public WorkflowHandle<?> forkWorkflow(String workflowId, int startStep) {
407-
return this.dbosExecutor.forkWorkflow(workflowId, null, startStep, null) ;
379+
public WorkflowHandle<?> forkWorkflow(String workflowId, int startStep, ForkOptions options) {
380+
return this.dbosExecutor.forkWorkflow(workflowId, startStep, options) ;
408381
}
409382

410383

src/main/java/dev/dbos/transact/database/SystemDatabase.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import dev.dbos.transact.notifications.GetWorkflowEventContext;
1212
import dev.dbos.transact.notifications.NotificationService;
1313
import dev.dbos.transact.queue.Queue;
14+
import dev.dbos.transact.workflow.ForkOptions;
1415
import dev.dbos.transact.workflow.ListWorkflowsInput;
1516
import dev.dbos.transact.workflow.StepInfo;
1617
import dev.dbos.transact.workflow.WorkflowStatus;
@@ -326,12 +327,11 @@ public void resumeWorkflow(String workflowId) {
326327
}
327328

328329
public String forkWorkflow(String originalWorkflowId,
329-
String forkedWorkflowId,
330330
int startStep,
331-
String applicationVersion) {
331+
ForkOptions options) {
332332

333333
try {
334-
return workflowDAO.forkWorkflow(originalWorkflowId, forkedWorkflowId, startStep, applicationVersion) ;
334+
return workflowDAO.forkWorkflow(originalWorkflowId, startStep, options) ;
335335
} catch (SQLException sq) {
336336
throw new DBOSException(ErrorCode.RESUME_WORKFLOW_ERROR.getCode(), sq.getMessage()) ;
337337
}

src/main/java/dev/dbos/transact/database/WorkflowDAO.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import dev.dbos.transact.Constants;
55
import dev.dbos.transact.exceptions.*;
66
import dev.dbos.transact.json.JSONUtil;
7+
import dev.dbos.transact.workflow.ForkOptions;
78
import dev.dbos.transact.workflow.ListWorkflowsInput;
89
import dev.dbos.transact.workflow.WorkflowState;
910
import dev.dbos.transact.workflow.WorkflowStatus;
@@ -756,15 +757,20 @@ public void resumeWorkflow(String workflowId) throws SQLException {
756757
}
757758

758759
public String forkWorkflow(String originalWorkflowId,
759-
String forkedWorkflowId,
760760
int startStep,
761-
String applicationVersion) throws SQLException {
761+
ForkOptions options) throws SQLException {
762762

763+
String forkedWorkflowId = options.getForkedWorkflowId() == null ?
764+
UUID.randomUUID().toString() : options.getForkedWorkflowId() ;
763765

764766
logger.info("Original " + originalWorkflowId + "forked " + forkedWorkflowId) ;
765767

768+
String applicationVersion = options.getApplicationVersion() ;
769+
766770
WorkflowStatus status = getWorkflowStatus(originalWorkflowId) ;
767771

772+
long timeoutMs = options.getTimeoutMS() == 0 ? status.getWorkflowTimeoutMs() : options.getTimeoutMS();
773+
768774
if (status == null) {
769775
throw new NonExistentWorkflowException(originalWorkflowId);
770776
}
@@ -774,7 +780,7 @@ public String forkWorkflow(String originalWorkflowId,
774780

775781
try {
776782
// Create entry for forked workflow
777-
insertForkedWorkflowStatus(connection, forkedWorkflowId, status, applicationVersion);
783+
insertForkedWorkflowStatus(connection, forkedWorkflowId, status, applicationVersion, timeoutMs);
778784

779785
// Copy operation outputs if starting from step > 0
780786
if (startStep > 0) {
@@ -794,11 +800,12 @@ public String forkWorkflow(String originalWorkflowId,
794800
private void insertForkedWorkflowStatus(Connection connection,
795801
String forkedWorkflowId,
796802
WorkflowStatus originalStatus,
797-
String applicationVersion) throws SQLException {
803+
String applicationVersion,
804+
long timeoutMs) throws SQLException {
798805

799806
long workflowDeadlineEpoch = 0 ;
800-
if (originalStatus.getWorkflowTimeoutMs() > 0) {
801-
workflowDeadlineEpoch = System.currentTimeMillis() + originalStatus.getWorkflowTimeoutMs() ;
807+
if (timeoutMs > 0) {
808+
workflowDeadlineEpoch = System.currentTimeMillis() + timeoutMs ;
802809
}
803810

804811
String sql = "INSERT INTO dbos.workflow_status ( " +
@@ -855,11 +862,11 @@ private void copyOperationOutputs(Connection connection,
855862
}
856863
}
857864

858-
public String forkWorkflow(String originalWorkflowId,
865+
/* public String forkWorkflow(String originalWorkflowId,
859866
String forkedWorkflowId,
860867
int startStep) throws SQLException {
861868
return forkWorkflow(originalWorkflowId, forkedWorkflowId, startStep, null);
862-
}
869+
} */
863870

864871
private String getWorkflowStatus(Connection connection, String workflowId) throws SQLException {
865872
String sql = "SELECT status FROM dbos.workflow_status WHERE workflow_uuid = ?";

src/main/java/dev/dbos/transact/execution/DBOSExecutor.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import dev.dbos.transact.queue.Queue;
1313
import dev.dbos.transact.queue.QueueRegistry;
1414
import dev.dbos.transact.queue.QueueService;
15+
import dev.dbos.transact.workflow.ForkOptions;
1516
import dev.dbos.transact.workflow.WorkflowHandle;
1617
import dev.dbos.transact.workflow.WorkflowState;
1718
import dev.dbos.transact.workflow.WorkflowStatus;
@@ -513,7 +514,7 @@ public void cancelWorkflow(String workflowId) {
513514

514515
}
515516

516-
public WorkflowHandle<?> forkWorkflow(String workflowId, String forkedWorkflowId, int startStep, String applicationVersion) {
517+
/* WorkflowHandle<?> forkWorkflow(String workflowId, String forkedWorkflowId, int startStep, String applicationVersion) {
517518
518519
if (forkedWorkflowId == null) {
519520
forkedWorkflowId = UUID.randomUUID().toString();
@@ -529,6 +530,17 @@ public WorkflowHandle<?> forkWorkflow(String workflowId, String forkedWorkflowId
529530
530531
systemDatabase.callFunctionAsStep(forkFunction, "DBOS.forkedWorkflow");
531532
return retrieveWorkflow(newId);
532-
}
533+
} */
534+
535+
public WorkflowHandle<?> forkWorkflow(String workflowId, int startStep, ForkOptions options) {
536+
537+
Supplier<String> forkFunction = () -> {
538+
logger.info(String.format("Forking workflow:%s from step:%d ", workflowId, startStep));
539+
540+
return systemDatabase.forkWorkflow(workflowId, startStep, options);
541+
};
533542

543+
String forkedId = systemDatabase.callFunctionAsStep(forkFunction, "DBOS.forkedWorkflow");
544+
return retrieveWorkflow(forkedId);
545+
}
534546
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package dev.dbos.transact.workflow;
2+
3+
public class ForkOptions {
4+
private final String forkedWorkflowId;
5+
private final String applicationVersion;
6+
private final long timeoutMS;
7+
8+
private ForkOptions(Builder builder) {
9+
this.forkedWorkflowId = builder.forkedWorkflowId;
10+
this.applicationVersion = builder.applicationVersion;
11+
this.timeoutMS = builder.timeoutMS;
12+
}
13+
14+
public String getForkedWorkflowId() {
15+
return forkedWorkflowId;
16+
}
17+
18+
public String getApplicationVersion() {
19+
return applicationVersion;
20+
}
21+
22+
public long getTimeoutMS() {
23+
return timeoutMS;
24+
}
25+
26+
public static Builder builder() {
27+
return new Builder();
28+
}
29+
30+
public static class Builder {
31+
private String forkedWorkflowId;
32+
private String applicationVersion;
33+
private long timeoutMS;
34+
35+
public Builder() {}
36+
37+
public Builder forkedWorkflowId(String forkedWorkflowId) {
38+
this.forkedWorkflowId = forkedWorkflowId;
39+
return this;
40+
}
41+
42+
public Builder applicationVersion(String applicationVersion) {
43+
this.applicationVersion = applicationVersion;
44+
return this;
45+
}
46+
47+
public Builder timeoutMS(long timeoutMS) {
48+
this.timeoutMS = timeoutMS;
49+
return this;
50+
}
51+
52+
public ForkOptions build() {
53+
return new ForkOptions(this);
54+
}
55+
}
56+
57+
@Override
58+
public String toString() {
59+
return "ForkOptions{" +
60+
"forkedWorkflowId='" + forkedWorkflowId + '\'' +
61+
", applicationVersion='" + applicationVersion + '\'' +
62+
", timeoutMS=" + timeoutMS +
63+
'}';
64+
}
65+
}

src/test/java/dev/dbos/transact/workflow/WorkflowMgmtTest.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,8 @@ public void syncCancelResumeTest() throws Exception {
256256
public void forkNonExistent() {
257257

258258
try {
259-
WorkflowHandle<?> rstatHandle = dbos.forkWorkflow("12345", 2);
259+
ForkOptions options = new ForkOptions.Builder().build() ;
260+
WorkflowHandle<?> rstatHandle = dbos.forkWorkflow("12345", 2, options);
260261
fail("An exceptions should have been thrown");
261262
} catch (Throwable t) {
262263
logger.info(t.getClass().getName()) ;
@@ -295,7 +296,8 @@ public void testFork() {
295296

296297
logger.info("First execution done starting fork") ;
297298

298-
WorkflowHandle<?> rstatHandle = dbos.forkWorkflow(workflowId, 0);
299+
ForkOptions foptions = new ForkOptions.Builder().build() ;
300+
WorkflowHandle<?> rstatHandle = dbos.forkWorkflow(workflowId, 0, foptions);
299301
result = (String) rstatHandle.getResult() ;
300302
assertEquals("hellohello", result);
301303
assertEquals(WorkflowState.SUCCESS.name(), rstatHandle.getStatus().getStatus());
@@ -312,7 +314,7 @@ public void testFork() {
312314

313315
logger.info("first fork done . starting 2nd fork ") ;
314316

315-
rstatHandle = dbos.forkWorkflow(workflowId, 2);
317+
rstatHandle = dbos.forkWorkflow(workflowId, 2, foptions);
316318
result = (String) rstatHandle.getResult() ;
317319
assertEquals("hellohello", result);
318320
assertEquals(WorkflowState.SUCCESS.name(), rstatHandle.getStatus().getStatus());
@@ -326,7 +328,7 @@ public void testFork() {
326328

327329
logger.info("Second fork done . starting 3rd fork ") ;
328330

329-
rstatHandle = dbos.forkWorkflow(workflowId, 4);
331+
rstatHandle = dbos.forkWorkflow(workflowId, 4, foptions);
330332
result = (String) rstatHandle.getResult() ;
331333
assertEquals("hellohello", result);
332334
assertEquals(WorkflowState.SUCCESS.name(), rstatHandle.getStatus().getStatus());
@@ -373,7 +375,8 @@ public void testParentChildFork() {
373375

374376
logger.info("First execution done starting fork") ;
375377

376-
WorkflowHandle<?> rstatHandle = dbos.forkWorkflow(workflowId, 0);
378+
ForkOptions foptions = new ForkOptions.Builder().build() ;
379+
WorkflowHandle<?> rstatHandle = dbos.forkWorkflow(workflowId, 0, foptions);
377380
result = (String) rstatHandle.getResult() ;
378381
assertEquals("hellohello", result);
379382
assertEquals(WorkflowState.SUCCESS.name(), rstatHandle.getStatus().getStatus());
@@ -393,7 +396,7 @@ public void testParentChildFork() {
393396

394397
logger.info("First execution done starting 2nd fork");
395398

396-
rstatHandle = dbos.forkWorkflow(workflowId, 3);
399+
rstatHandle = dbos.forkWorkflow(workflowId, 3, foptions);
397400
result = (String) rstatHandle.getResult() ;
398401
assertEquals("hellohello", result);
399402
assertEquals(WorkflowState.SUCCESS.name(), rstatHandle.getStatus().getStatus());
@@ -415,7 +418,7 @@ public void testParentChildFork() {
415418

416419
logger.info("First execution done starting 2nd fork");
417420

418-
rstatHandle = dbos.forkWorkflow(workflowId, 4);
421+
rstatHandle = dbos.forkWorkflow(workflowId, 4, foptions);
419422
result = (String) rstatHandle.getResult() ;
420423
assertEquals("hellohello", result);
421424
assertEquals(WorkflowState.SUCCESS.name(), rstatHandle.getStatus().getStatus());
@@ -470,7 +473,8 @@ public void testParentChildAsyncFork() {
470473

471474
logger.info("First execution done starting fork");
472475

473-
WorkflowHandle<?> rstatHandle = dbos.forkWorkflow(workflowId, 3);
476+
ForkOptions foptions = new ForkOptions.Builder().build() ;
477+
WorkflowHandle<?> rstatHandle = dbos.forkWorkflow(workflowId, 3, foptions);
474478
result = (String) rstatHandle.getResult();
475479

476480
assertEquals("hellohello", result);

0 commit comments

Comments
 (0)