Skip to content

Commit d669623

Browse files
authored
Fork and restart (#19)
2 parents 76b582f + 4855276 commit d669623

10 files changed

Lines changed: 725 additions & 8 deletions

File tree

src/main/java/dev/dbos/transact/DBOS.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import dev.dbos.transact.queue.QueueService;
1515
import dev.dbos.transact.queue.RateLimit;
1616
import dev.dbos.transact.scheduled.SchedulerService;
17+
import dev.dbos.transact.workflow.ForkOptions;
1718
import dev.dbos.transact.workflow.WorkflowHandle;
1819
import org.slf4j.Logger;
1920
import org.slf4j.LoggerFactory;
@@ -365,5 +366,20 @@ public WorkflowHandle<?> resumeWorkflow(String workflowId) {
365366
public void cancelWorkflow(String workflowId) {
366367
this.dbosExecutor.cancelWorkflow(workflowId);
367368
}
369+
370+
/**
371+
* Fork the workflow. Re-execute with another Id from the step provided. Steps prior to the provided
372+
* step are copied over
373+
*
374+
* @param workflowId Original workflow Id
375+
* @param startStep Start execution from this step. Prior steps copied over
376+
* @param options {@link ForkOptions} containing forkedWorkflowId, applicationVersion, timeout
377+
* @return handle to the workflow
378+
*/
379+
public WorkflowHandle<?> forkWorkflow(String workflowId, int startStep, ForkOptions options) {
380+
return this.dbosExecutor.forkWorkflow(workflowId, startStep, options) ;
381+
}
382+
383+
368384
}
369385

src/main/java/dev/dbos/transact/database/SystemDatabase.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import dev.dbos.transact.notifications.GetWorkflowEventContext;
1212
import dev.dbos.transact.notifications.NotificationService;
1313
import dev.dbos.transact.queue.Queue;
14+
import dev.dbos.transact.workflow.ForkOptions;
1415
import dev.dbos.transact.workflow.ListWorkflowsInput;
1516
import dev.dbos.transact.workflow.StepInfo;
1617
import dev.dbos.transact.workflow.WorkflowStatus;
@@ -239,6 +240,16 @@ public void recordChildWorkflow(String parentId,
239240
workflowDAO.recordChildWorkflow(parentId, childId, functionId, functionName);
240241
}
241242

243+
public Optional<String> checkChildWorkflow(String workflowUuid, int functionId) {
244+
245+
try {
246+
return workflowDAO.checkChildWorkflow(workflowUuid, functionId) ;
247+
} catch (SQLException sq) {
248+
throw new DBOSException(UNEXPECTED.getCode(), sq.getMessage());
249+
}
250+
251+
}
252+
242253
public void send(String workflowId, int functionId, String destinationId,
243254
Object message, String topic) {
244255

@@ -315,6 +326,17 @@ public void resumeWorkflow(String workflowId) {
315326

316327
}
317328

329+
public String forkWorkflow(String originalWorkflowId,
330+
int startStep,
331+
ForkOptions options) {
332+
333+
try {
334+
return workflowDAO.forkWorkflow(originalWorkflowId, startStep, options) ;
335+
} catch (SQLException sq) {
336+
throw new DBOSException(ErrorCode.RESUME_WORKFLOW_ERROR.getCode(), sq.getMessage()) ;
337+
}
338+
}
339+
318340

319341
public <T> T callFunctionAsStep(Supplier<T> fn, String functionName) {
320342
DBOSContext ctx = DBOSContextHolder.get();
@@ -350,7 +372,12 @@ public <T> T callFunctionAsStep(Supplier<T> fn, String functionName) {
350372
StepResult r = new StepResult(ctx.getWorkflowId(), nextFuncId, functionName, null, jsonError);
351373
stepsDAO.recordStepResultTxn(r);
352374
}
353-
throw new DBOSException(UNEXPECTED.getCode(), "Function execution failed: " + functionName, e);
375+
376+
if ( e instanceof NonExistentWorkflowException) {
377+
throw e;
378+
} else {
379+
throw new DBOSException(UNEXPECTED.getCode(), "Function execution failed: " + functionName, e);
380+
}
354381
}
355382

356383
// If we're in a workflow, record the successful result

src/main/java/dev/dbos/transact/database/WorkflowDAO.java

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import dev.dbos.transact.Constants;
55
import dev.dbos.transact.exceptions.*;
66
import dev.dbos.transact.json.JSONUtil;
7+
import dev.dbos.transact.workflow.ForkOptions;
78
import dev.dbos.transact.workflow.ListWorkflowsInput;
89
import dev.dbos.transact.workflow.WorkflowState;
910
import dev.dbos.transact.workflow.WorkflowStatus;
@@ -658,6 +659,28 @@ public void recordChildWorkflow(String parentId,
658659

659660
}
660661

662+
public Optional<String> checkChildWorkflow(String workflowUuid, int functionId) throws SQLException {
663+
String sql = "SELECT child_workflow_id " +
664+
" FROM dbos.operation_outputs " +
665+
"WHERE workflow_uuid = ? AND function_id = ? " ;
666+
667+
668+
try (Connection connection = dataSource.getConnection();
669+
PreparedStatement stmt = connection.prepareStatement(sql)) {
670+
671+
stmt.setString(1, workflowUuid);
672+
stmt.setInt(2, functionId);
673+
674+
try (ResultSet rs = stmt.executeQuery()) {
675+
if (rs.next()) {
676+
String childWorkflowId = rs.getString("child_workflow_id");
677+
return childWorkflowId != null ? Optional.of(childWorkflowId) : Optional.empty();
678+
}
679+
return Optional.empty();
680+
}
681+
}
682+
}
683+
661684
public void cancelWorkflow(String workflowId) throws SQLException {
662685

663686
try (Connection conn = dataSource.getConnection()) {
@@ -733,6 +756,118 @@ public void resumeWorkflow(String workflowId) throws SQLException {
733756
}
734757
}
735758

759+
public String forkWorkflow(String originalWorkflowId,
760+
int startStep,
761+
ForkOptions options) throws SQLException {
762+
763+
String forkedWorkflowId = options.getForkedWorkflowId() == null ?
764+
UUID.randomUUID().toString() : options.getForkedWorkflowId() ;
765+
766+
logger.info("Original " + originalWorkflowId + "forked " + forkedWorkflowId) ;
767+
768+
String applicationVersion = options.getApplicationVersion() ;
769+
770+
WorkflowStatus status = getWorkflowStatus(originalWorkflowId) ;
771+
772+
long timeoutMs = options.getTimeoutMS() == 0 ? status.getWorkflowTimeoutMs() : options.getTimeoutMS();
773+
774+
if (status == null) {
775+
throw new NonExistentWorkflowException(originalWorkflowId);
776+
}
777+
778+
try (Connection connection = dataSource.getConnection()) {
779+
connection.setAutoCommit(false);
780+
781+
try {
782+
// Create entry for forked workflow
783+
insertForkedWorkflowStatus(connection, forkedWorkflowId, status, applicationVersion, timeoutMs);
784+
785+
// Copy operation outputs if starting from step > 0
786+
if (startStep > 0) {
787+
copyOperationOutputs(connection, originalWorkflowId, forkedWorkflowId, startStep);
788+
}
789+
790+
connection.commit();
791+
return forkedWorkflowId;
792+
793+
} catch (SQLException e) {
794+
connection.rollback();
795+
throw e;
796+
}
797+
}
798+
}
799+
800+
private void insertForkedWorkflowStatus(Connection connection,
801+
String forkedWorkflowId,
802+
WorkflowStatus originalStatus,
803+
String applicationVersion,
804+
long timeoutMs) throws SQLException {
805+
806+
long workflowDeadlineEpoch = 0 ;
807+
if (timeoutMs > 0) {
808+
workflowDeadlineEpoch = System.currentTimeMillis() + timeoutMs ;
809+
}
810+
811+
String sql = "INSERT INTO dbos.workflow_status ( " +
812+
" workflow_uuid, status, name, class_name, config_name, application_version, application_id, " +
813+
" authenticated_user, authenticated_roles, assumed_role, queue_name, inputs, workflow_deadline_epoch_ms, workflow_timeout_ms " +
814+
" ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) " ;
815+
816+
try (PreparedStatement stmt = connection.prepareStatement(sql)) {
817+
stmt.setString(1, forkedWorkflowId);
818+
stmt.setString(2, WorkflowState.ENQUEUED.name());
819+
stmt.setString(3, originalStatus.getName());
820+
stmt.setString(4, originalStatus.getClassName());
821+
stmt.setString(5, originalStatus.getConfigName());
822+
823+
// Use provided application version or fall back to original
824+
String appVersion = applicationVersion != null ?
825+
applicationVersion : originalStatus.getAppVersion();
826+
stmt.setString(6, appVersion);
827+
828+
stmt.setString(7, originalStatus.getAppId());
829+
stmt.setString(8, originalStatus.getAuthenticatedUser());
830+
stmt.setString(9, JSONUtil.serializeArray(originalStatus.getAuthenticatedRoles()));
831+
stmt.setString(10, originalStatus.getAssumedRole());
832+
stmt.setString(11, Constants.DBOS_INTERNAL_QUEUE);
833+
stmt.setString(12, JSONUtil.serializeArray(originalStatus.getInput()));
834+
stmt.setLong(13, workflowDeadlineEpoch);
835+
stmt.setLong(14, originalStatus.getWorkflowTimeoutMs());
836+
837+
838+
stmt.executeUpdate();
839+
}
840+
}
841+
842+
private void copyOperationOutputs(Connection connection,
843+
String originalWorkflowId,
844+
String forkedWorkflowId,
845+
int startStep) throws SQLException {
846+
847+
String sql = "INSERT INTO dbos.operation_outputs ( " +
848+
" workflow_uuid, function_id, output, error, function_name, child_workflow_id) " +
849+
" SELECT ? as workflow_uuid, function_id, output, error, function_name, child_workflow_id " +
850+
" FROM dbos.operation_outputs " +
851+
" WHERE workflow_uuid = ? " +
852+
" AND function_id < ? " ;
853+
854+
855+
try (PreparedStatement stmt = connection.prepareStatement(sql)) {
856+
stmt.setString(1, forkedWorkflowId);
857+
stmt.setString(2, originalWorkflowId);
858+
stmt.setInt(3, startStep);
859+
860+
int rowsCopied = stmt.executeUpdate();
861+
System.out.println("Copied " + rowsCopied + " operation outputs to forked workflow");
862+
}
863+
}
864+
865+
/* public String forkWorkflow(String originalWorkflowId,
866+
String forkedWorkflowId,
867+
int startStep) throws SQLException {
868+
return forkWorkflow(originalWorkflowId, forkedWorkflowId, startStep, null);
869+
} */
870+
736871
private String getWorkflowStatus(Connection connection, String workflowId) throws SQLException {
737872
String sql = "SELECT status FROM dbos.workflow_status WHERE workflow_uuid = ?";
738873

src/main/java/dev/dbos/transact/exceptions/ErrorCode.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ public enum ErrorCode {
1212
UNEXPECTED_STEP(8),
1313
WORKFLOW_FUNCTION_NOT_FOUND(9),
1414
SLEEP_NOT_IN_WORKFLOW(10),
15-
RESUME_WORKFLOW_ERROR(11);
15+
RESUME_WORKFLOW_ERROR(11),
16+
FORK_WORKFLOW_ERROR(12) ;
1617

1718
private int code ;
1819

src/main/java/dev/dbos/transact/execution/DBOSExecutor.java

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import dev.dbos.transact.queue.Queue;
1313
import dev.dbos.transact.queue.QueueRegistry;
1414
import dev.dbos.transact.queue.QueueService;
15+
import dev.dbos.transact.workflow.ForkOptions;
1516
import dev.dbos.transact.workflow.WorkflowHandle;
1617
import dev.dbos.transact.workflow.WorkflowState;
1718
import dev.dbos.transact.workflow.WorkflowStatus;
@@ -25,6 +26,7 @@
2526
import java.lang.reflect.InvocationTargetException;
2627
import java.lang.reflect.Method;
2728
import java.sql.SQLException;
29+
import java.util.Optional;
2830
import java.util.UUID;
2931
import java.util.concurrent.*;
3032
import java.util.function.Supplier;
@@ -81,7 +83,6 @@ public WorkflowInitResult preInvokeWorkflow(String workflowName,
8183

8284
long workflowTimeoutMs = DBOSContextHolder.get().getWorkflowTimeoutMs() ;
8385
long workflowDeadlineEpoch = 0 ;
84-
8586
if (workflowTimeoutMs > 0) {
8687
workflowDeadlineEpoch = System.currentTimeMillis() + workflowTimeoutMs ;
8788
}
@@ -158,6 +159,14 @@ public <T> T syncWorkflow(String workflowName,
158159

159160
WorkflowInitResult initResult = null;
160161

162+
DBOSContext ctx = DBOSContextHolder.get() ;
163+
if (ctx.hasParent()) {
164+
Optional<String> childId = systemDatabase.checkChildWorkflow(ctx.getParentWorkflowId(), ctx.getParentFunctionId()) ;
165+
if (childId.isPresent()) {
166+
return (T)systemDatabase.awaitWorkflowResult(childId.get()) ;
167+
}
168+
}
169+
161170
initResult = preInvokeWorkflow(workflowName, targetClassName, args, wfid, null);
162171

163172
if (initResult.getStatus().equals(WorkflowState.SUCCESS.name())) {
@@ -249,6 +258,16 @@ public <T> WorkflowHandle<T> submitWorkflow(String workflowName,
249258

250259
final String wfId = workflowId ;
251260

261+
262+
if (ctx.hasParent()) {
263+
Optional<String> childId = systemDatabase.checkChildWorkflow(ctx.getParentWorkflowId(), ctx.getParentFunctionId()) ;
264+
if (childId.isPresent()) {
265+
logger.info("child Id is present " + childId) ;
266+
return new WorkflowHandleDBPoll<>(childId.get(), systemDatabase);
267+
}
268+
}
269+
270+
252271
WorkflowInitResult initResult = preInvokeWorkflow(workflowName, targetClassName, args, wfId, null);
253272

254273
if (initResult.getStatus().equals(WorkflowState.SUCCESS.name())) {
@@ -319,6 +338,7 @@ public void enqueueWorkflow(String workflowName,
319338
DBOSContext ctx = DBOSContextHolder.get();
320339
String wfid = ctx.getWorkflowId() ;
321340

341+
322342
if (wfid == null) {
323343
wfid = UUID.randomUUID().toString();
324344
ctx.setWorkflowId(wfid);
@@ -494,4 +514,33 @@ public void cancelWorkflow(String workflowId) {
494514

495515
}
496516

517+
/* WorkflowHandle<?> forkWorkflow(String workflowId, String forkedWorkflowId, int startStep, String applicationVersion) {
518+
519+
if (forkedWorkflowId == null) {
520+
forkedWorkflowId = UUID.randomUUID().toString();
521+
}
522+
523+
final String newId = forkedWorkflowId ;
524+
525+
Supplier<String> forkFunction = () -> {
526+
logger.info(String.format("Forking workflow:%s from step:%d ", workflowId, startStep));
527+
528+
return systemDatabase.forkWorkflow(workflowId, newId, startStep, applicationVersion);
529+
};
530+
531+
systemDatabase.callFunctionAsStep(forkFunction, "DBOS.forkedWorkflow");
532+
return retrieveWorkflow(newId);
533+
} */
534+
535+
public WorkflowHandle<?> forkWorkflow(String workflowId, int startStep, ForkOptions options) {
536+
537+
Supplier<String> forkFunction = () -> {
538+
logger.info(String.format("Forking workflow:%s from step:%d ", workflowId, startStep));
539+
540+
return systemDatabase.forkWorkflow(workflowId, startStep, options);
541+
};
542+
543+
String forkedId = systemDatabase.callFunctionAsStep(forkFunction, "DBOS.forkedWorkflow");
544+
return retrieveWorkflow(forkedId);
545+
}
497546
}

0 commit comments

Comments
 (0)