Skip to content

Commit e034d97

Browse files
authored
Resume/Fork on specified queue (#343)
fixes #305
1 parent 1872afa commit e034d97

18 files changed

Lines changed: 481 additions & 94 deletions

File tree

transact/src/main/java/dev/dbos/transact/DBOS.java

Lines changed: 49 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -614,33 +614,73 @@ public <E extends Exception> void runStep(
614614
}
615615

616616
/**
617-
* Resume a workflow starting from the step after the last complete step
617+
* Resume a workflow starting from the step after the last complete step. This method allows
618+
* resuming workflows that were previously interrupted, failed, or canceled. The workflow will
619+
* continue execution from where it left off, replaying any completed steps deterministically.
618620
*
619621
* @param <T> Return type of the workflow function
620-
* @param <E> Checked exception thrown by the workflow function, if any
621-
* @param workflowId id of the workflow
622-
* @return A handle to the workflow
622+
* @param <E> Type of checked exception thrown by the workflow function, if any
623+
* @param workflowId ID of the workflow to resume; must not be null
624+
* @param queueName optional queue name to enqueue the resumed workflow to; if null, the workflow
625+
* will be resumed in the default execution context
626+
* @return A handle to the resumed workflow
627+
* @throws IllegalStateException if called before DBOS is launched
623628
*/
624629
@SuppressWarnings("unchecked")
625630
public <T, E extends Exception> @NonNull WorkflowHandle<T, E> resumeWorkflow(
626-
@NonNull String workflowId) {
627-
var handles = resumeWorkflows(List.of(workflowId));
631+
@NonNull String workflowId, @Nullable String queueName) {
632+
var handles = resumeWorkflows(List.of(workflowId), queueName);
628633
assert (handles.size() == 1);
629634
return (WorkflowHandle<T, E>) handles.get(0);
630635
}
631636

637+
/**
638+
* Resume a workflow starting from the step after the last complete step using the default queue.
639+
* This method is equivalent to calling {@code resumeWorkflow(workflowId, null)}. The workflow
640+
* will continue execution from where it left off, replaying any completed steps
641+
* deterministically.
642+
*
643+
* @param <T> Return type of the workflow function
644+
* @param <E> Type of checked exception thrown by the workflow function, if any
645+
* @param workflowId ID of the workflow to resume; must not be null
646+
* @return A handle to the resumed workflow
647+
* @throws IllegalStateException if called before DBOS is launched
648+
*/
649+
public <T, E extends Exception> @NonNull WorkflowHandle<T, E> resumeWorkflow(
650+
@NonNull String workflowId) {
651+
return resumeWorkflow(workflowId, null);
652+
}
653+
632654
/**
633655
* Resume multiple workflows starting from the step after the last complete step for each
634-
* workflow. This method allows bulk resumption of workflows that were previously interrupted or
635-
* failed.
656+
* workflow. This method allows bulk resumption of workflows that were previously interrupted,
657+
* failed, or canceled. Each workflow will continue execution from where it left off, replaying
658+
* any completed steps deterministically.
659+
*
660+
* @param workflowIds a list of workflow IDs to resume; must not be null
661+
* @param queueName optional queue name to enqueue the resumed workflows to; if null, the
662+
* workflows will be resumed in the default execution context
663+
* @return A list of handles to the resumed workflows
664+
* @throws IllegalStateException if called before DBOS is launched
665+
*/
666+
public @NonNull List<WorkflowHandle<Object, Exception>> resumeWorkflows(
667+
@NonNull List<String> workflowIds, @Nullable String queueName) {
668+
return ensureLaunched("resumeWorkflow").resumeWorkflows(workflowIds, queueName);
669+
}
670+
671+
/**
672+
* Resume multiple workflows starting from the step after the last complete step for each workflow
673+
* using the default queue. This method is equivalent to calling {@code
674+
* resumeWorkflows(workflowIds, null)}. Each workflow will continue execution from where it left
675+
* off, replaying any completed steps deterministically.
636676
*
637677
* @param workflowIds a list of workflow IDs to resume; must not be null
638678
* @return A list of handles to the resumed workflows
639679
* @throws IllegalStateException if called before DBOS is launched
640680
*/
641681
public @NonNull List<WorkflowHandle<Object, Exception>> resumeWorkflows(
642682
@NonNull List<String> workflowIds) {
643-
return ensureLaunched("resumeWorkflow").resumeWorkflows(workflowIds);
683+
return resumeWorkflows(workflowIds, null);
644684
}
645685

646686
/***

transact/src/main/java/dev/dbos/transact/DBOSClient.java

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -676,30 +676,67 @@ public void cancelWorkflows(@NonNull List<String> workflowIds) {
676676
}
677677

678678
/**
679-
* Resume a canceled workflow, providing a handle to the workflow
679+
* Resume a workflow starting from the step after the last complete step. This method allows
680+
* resuming workflows that were previously interrupted, failed, or canceled. The workflow will
681+
* continue execution from where it left off, replaying any completed steps deterministically.
680682
*
681683
* @param <T> Type of the workflow's return value
682684
* @param <E> Type of any checked exception thrown by the workflow
683-
* @param workflowId ID of the workflow to resume
684-
* @return `WorkflowHandle` for the resumed workflow
685+
* @param workflowId ID of the workflow to resume; must not be null
686+
* @param queueName optional queue name to enqueue the resumed workflow to; if null, the workflow
687+
* will be resumed in the default execution context
688+
* @return WorkflowHandle for the resumed workflow
685689
*/
686690
public <T, E extends Exception> @NonNull WorkflowHandle<T, E> resumeWorkflow(
687-
@NonNull String workflowId) {
688-
systemDatabase.resumeWorkflows(List.of(workflowId));
691+
@NonNull String workflowId, @Nullable String queueName) {
692+
systemDatabase.resumeWorkflows(List.of(workflowId), queueName);
689693
return retrieveWorkflow(workflowId);
690694
}
691695

692696
/**
693-
* Resume multiple workflows starting from the step after the last complete step for each
694-
* workflow. This method allows bulk resumption of workflows that were previously interrupted or
695-
* failed.
697+
* Resume a workflow starting from the step after the last complete step using the default queue.
698+
* This method is equivalent to calling {@code resumeWorkflow(workflowId, null)}. The workflow
699+
* will continue execution from where it left off, replaying any completed steps
700+
* deterministically.
701+
*
702+
* @param <T> Type of the workflow's return value
703+
* @param <E> Type of any checked exception thrown by the workflow
704+
* @param workflowId ID of the workflow to resume; must not be null
705+
* @return WorkflowHandle for the resumed workflow
706+
*/
707+
public <T, E extends Exception> @NonNull WorkflowHandle<T, E> resumeWorkflow(
708+
@NonNull String workflowId) {
709+
return resumeWorkflow(workflowId, null);
710+
}
711+
712+
/**
713+
* Resume multiple workflows starting from the step after the last complete step for each workflow
714+
* using the default queue. This method is equivalent to calling {@code
715+
* resumeWorkflows(workflowIds, null)}. Each workflow will continue execution from where it left
716+
* off, replaying any completed steps deterministically.
696717
*
697718
* @param workflowIds a list of workflow IDs to resume; must not be null
698719
* @return A list of handles to the resumed workflows
699720
*/
700721
public @NonNull List<WorkflowHandle<Object, Exception>> resumeWorkflows(
701722
@NonNull List<String> workflowIds) {
702-
systemDatabase.resumeWorkflows(workflowIds);
723+
return resumeWorkflows(workflowIds, null);
724+
}
725+
726+
/**
727+
* Resume multiple workflows starting from the step after the last complete step for each
728+
* workflow. This method allows bulk resumption of workflows that were previously interrupted,
729+
* failed, or canceled. Each workflow will continue execution from where it left off, replaying
730+
* any completed steps deterministically.
731+
*
732+
* @param workflowIds a list of workflow IDs to resume; must not be null
733+
* @param queueName optional queue name to enqueue the resumed workflows to; if null, the
734+
* workflows will be resumed in the default execution context
735+
* @return A list of handles to the resumed workflows
736+
*/
737+
public @NonNull List<WorkflowHandle<Object, Exception>> resumeWorkflows(
738+
@NonNull List<String> workflowIds, @Nullable String queueName) {
739+
systemDatabase.resumeWorkflows(workflowIds, queueName);
703740
return workflowIds.stream().map(this::retrieveWorkflow).toList();
704741
}
705742

transact/src/main/java/dev/dbos/transact/admin/AdminServer.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ private void resume(HttpExchange exchange, String wfid) throws IOException {
239239

240240
logger.info("resume workflow {}", wfid);
241241

242-
dbosExecutor.resumeWorkflows(List.of(wfid));
242+
dbosExecutor.resumeWorkflows(List.of(wfid), null);
243243
exchange.sendResponseHeaders(204, 0);
244244
}
245245

@@ -248,7 +248,9 @@ private void fork(HttpExchange exchange, String wfid) throws IOException {
248248

249249
var request = mapper.readValue(exchange.getRequestBody(), ForkRequest.class);
250250
int startStep = request.start_step == null ? 0 : request.start_step;
251-
var options = new ForkOptions(request.new_workflow_id, request.application_version, null);
251+
var options =
252+
new ForkOptions(request.new_workflow_id)
253+
.withApplicationVersion(request.application_version);
252254

253255
logger.info("fork workflow {} step {}", wfid, startStep);
254256
var handle = dbosExecutor.forkWorkflow(wfid, startStep, options);

transact/src/main/java/dev/dbos/transact/conductor/Conductor.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -778,7 +778,7 @@ static CompletableFuture<BaseResponse> handleResume(Conductor conductor, BaseMes
778778
? request.workflow_ids
779779
: List.of(request.workflow_id);
780780
try {
781-
conductor.dbosExecutor.resumeWorkflows(ids);
781+
conductor.dbosExecutor.resumeWorkflows(ids, request.queue_name);
782782
return new SuccessResponse(request, true);
783783
} catch (Exception e) {
784784
logger.error("Exception encountered when resuming workflow(s) {}", ids, e);
@@ -811,12 +811,9 @@ static CompletableFuture<BaseResponse> handleFork(Conductor conductor, BaseMessa
811811
return new ForkWorkflowResponse(request, null, "Invalid Fork Workflow Request");
812812
}
813813
try {
814-
var options =
815-
new ForkOptions(
816-
request.body.new_workflow_id, request.body.application_version, null);
817814
WorkflowHandle<?, ?> handle =
818815
conductor.dbosExecutor.forkWorkflow(
819-
request.body.workflow_id, request.body.start_step, options);
816+
request.body.workflow_id, request.body.start_step, request.toOptions());
820817
return new ForkWorkflowResponse(request, handle.workflowId());
821818
} catch (Exception e) {
822819
logger.error("Exception encountered when forking workflow {}", request, e);

transact/src/main/java/dev/dbos/transact/conductor/protocol/ForkWorkflowRequest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package dev.dbos.transact.conductor.protocol;
22

3+
import dev.dbos.transact.workflow.ForkOptions;
4+
35
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
46

57
public class ForkWorkflowRequest extends BaseMessage {
@@ -28,5 +30,16 @@ public static class ForkWorkflowBody {
2830
public Integer start_step;
2931
public String application_version; // optional
3032
public String new_workflow_id; // optional
33+
public String queue_name; // optional
34+
public String queue_partition_key; // optional
35+
}
36+
37+
public ForkOptions toOptions() {
38+
return new ForkOptions(
39+
body.new_workflow_id,
40+
body.application_version,
41+
null,
42+
body.queue_name,
43+
body.queue_partition_key);
3144
}
3245
}

transact/src/main/java/dev/dbos/transact/conductor/protocol/ResumeRequest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
public class ResumeRequest extends BaseMessage {
66
public String workflow_id;
77
public List<String> workflow_ids;
8+
public String queue_name;
89

910
public ResumeRequest() {}
1011

transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -396,8 +396,8 @@ public void cancelWorkflows(List<String> workflowIds) {
396396
dbRetry(() -> workflowDAO.cancelWorkflows(workflowIds));
397397
}
398398

399-
public void resumeWorkflows(List<String> workflowIds) {
400-
dbRetry(() -> workflowDAO.resumeWorkflows(workflowIds));
399+
public void resumeWorkflows(List<String> workflowIds, String queueName) {
400+
dbRetry(() -> workflowDAO.resumeWorkflows(workflowIds, queueName));
401401
}
402402

403403
public void deleteWorkflows(List<String> workflowIds, boolean deleteChildren) {

transact/src/main/java/dev/dbos/transact/database/WorkflowDAO.java

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -803,11 +803,12 @@ AND status NOT IN (?, ?)
803803
}
804804
}
805805

806-
void resumeWorkflows(List<String> workflowIds) throws SQLException {
806+
void resumeWorkflows(List<String> workflowIds, String queueName) throws SQLException {
807807
List<String> filtered = filterNullsAndBlanks(workflowIds);
808808
if (filtered.isEmpty()) {
809809
return;
810810
}
811+
811812
String sql =
812813
"""
813814
UPDATE "%s".workflow_status
@@ -827,7 +828,7 @@ AND status NOT IN (?, ?)
827828
Array array = conn.createArrayOf("text", filtered.toArray(String[]::new));
828829
try {
829830
stmt.setString(1, WorkflowState.ENQUEUED.name());
830-
stmt.setString(2, Constants.DBOS_INTERNAL_QUEUE);
831+
stmt.setString(2, Objects.requireNonNullElse(queueName, Constants.DBOS_INTERNAL_QUEUE));
831832
stmt.setArray(3, array);
832833
stmt.setString(4, WorkflowState.SUCCESS.name());
833834
stmt.setString(5, WorkflowState.ERROR.name());
@@ -915,14 +916,11 @@ String forkWorkflow(String originalWorkflowId, int startStep, ForkOptions option
915916
}
916917

917918
String forkedWorkflowId =
918-
options.forkedWorkflowId() == null
919-
? UUID.randomUUID().toString()
920-
: options.forkedWorkflowId();
919+
Objects.requireNonNullElseGet(
920+
options.forkedWorkflowId(), () -> UUID.randomUUID().toString());
921921

922922
logger.debug("forkWorkflow Original id {} forked id {}", originalWorkflowId, forkedWorkflowId);
923923

924-
String applicationVersion = options.applicationVersion();
925-
926924
var timeout = Objects.requireNonNullElseGet(options.timeout(), Timeout::inherit);
927925
Long timeoutMS = null;
928926
if (timeout instanceof Timeout.Inherit) {
@@ -941,8 +939,10 @@ String forkWorkflow(String originalWorkflowId, int startStep, ForkOptions option
941939
originalWorkflowId,
942940
forkedWorkflowId,
943941
status,
944-
applicationVersion,
942+
options.applicationVersion(),
945943
timeoutMS,
944+
options.queueName(),
945+
options.queuePartitionKey(),
946946
this.schema,
947947
this.serializer);
948948

@@ -969,6 +969,8 @@ private static void insertForkedWorkflowStatus(
969969
WorkflowStatus originalStatus,
970970
String applicationVersion,
971971
Long timeoutMS,
972+
String queueName,
973+
String queuePartitionKey,
972974
String schema,
973975
DBOSSerializer serializer)
974976
throws SQLException {
@@ -978,8 +980,9 @@ private static void insertForkedWorkflowStatus(
978980
"""
979981
INSERT INTO "%s".workflow_status (
980982
workflow_uuid, status, name, class_name, config_name, application_version, application_id,
981-
authenticated_user, authenticated_roles, assumed_role, queue_name, inputs, workflow_timeout_ms, forked_from, serialization
982-
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
983+
authenticated_user, authenticated_roles, assumed_role, queue_name, queue_partition_key, inputs,
984+
workflow_timeout_ms, forked_from, serialization
985+
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
983986
"""
984987
.formatted(schema);
985988

@@ -998,15 +1001,16 @@ private static void insertForkedWorkflowStatus(
9981001
? null
9991002
: JSONUtil.toJson(originalStatus.authenticatedRoles()));
10001003
stmt.setString(10, originalStatus.assumedRole());
1001-
stmt.setString(11, Constants.DBOS_INTERNAL_QUEUE);
1004+
stmt.setString(11, Objects.requireNonNullElse(queueName, Constants.DBOS_INTERNAL_QUEUE));
1005+
stmt.setString(12, queuePartitionKey);
10021006
stmt.setString(
1003-
12,
1007+
13,
10041008
SerializationUtil.serializeArgs(
10051009
originalStatus.input(), null, originalStatus.serialization(), serializer)
10061010
.serializedValue());
1007-
stmt.setObject(13, timeoutMS);
1008-
stmt.setString(14, originalWorkflowId);
1009-
stmt.setString(15, originalStatus.serialization());
1011+
stmt.setObject(14, timeoutMS);
1012+
stmt.setString(15, originalWorkflowId);
1013+
stmt.setString(16, originalStatus.serialization());
10101014

10111015
stmt.executeUpdate();
10121016
}

0 commit comments

Comments
 (0)