Skip to content

Commit 6bbf842

Browse files
authored
Bulk WF Operations (#325)
cancel/resume/delete workflows in bulk, either via DBOS/DBOSClient API or Conductor. fixes #311
1 parent 5d702d1 commit 6bbf842

13 files changed

Lines changed: 569 additions & 244 deletions

File tree

transact/src/main/java/dev/dbos/transact/DBOS.java

Lines changed: 94 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@ public class DBOS implements AutoCloseable {
6060

6161
private AlertHandler alertHandler;
6262

63+
/**
64+
* Construct a new DBOS instance with the provided configuration.
65+
*
66+
* @param config the DBOS configuration; must not be null
67+
* @throws NullPointerException if config or required config fields are null
68+
*/
6369
public DBOS(@NonNull DBOSConfig config) {
6470
Objects.requireNonNull(config, "DBOSConfig must not be null");
6571
Objects.requireNonNull(config.appName(), "DBOSConfig.appName must not be null");
@@ -72,6 +78,10 @@ public DBOS(@NonNull DBOSConfig config) {
7278
this.config = config;
7379
}
7480

81+
/**
82+
* Close this DBOS instance and shut down all associated resources. This method delegates to
83+
* {@link #shutdown()}.
84+
*/
7585
@Override
7686
public void close() throws Exception {
7787
shutdown();
@@ -98,6 +108,11 @@ public void close() throws Exception {
98108
}
99109
}
100110

111+
/**
112+
* Get the current DBOS version.
113+
*
114+
* @return the DBOS version string
115+
*/
101116
public static String version() {
102117
return DBOS_VERSION;
103118
}
@@ -592,9 +607,26 @@ public <E extends Exception> void runStep(
592607
* @param workflowId id of the workflow
593608
* @return A handle to the workflow
594609
*/
610+
@SuppressWarnings("unchecked")
595611
public <T, E extends Exception> @NonNull WorkflowHandle<T, E> resumeWorkflow(
596612
@NonNull String workflowId) {
597-
return ensureLaunched("resumeWorkflow").resumeWorkflow(workflowId);
613+
var handles = resumeWorkflows(List.of(workflowId));
614+
assert (handles.size() == 1);
615+
return (WorkflowHandle<T, E>) handles.get(0);
616+
}
617+
618+
/**
619+
* Resume multiple workflows starting from the step after the last complete step for each
620+
* workflow. This method allows bulk resumption of workflows that were previously interrupted or
621+
* failed.
622+
*
623+
* @param workflowIds a list of workflow IDs to resume; must not be null
624+
* @return A list of handles to the resumed workflows
625+
* @throws IllegalStateException if called before DBOS is launched
626+
*/
627+
public @NonNull List<WorkflowHandle<Object, Exception>> resumeWorkflows(
628+
@NonNull List<String> workflowIds) {
629+
return ensureLaunched("resumeWorkflow").resumeWorkflows(workflowIds);
598630
}
599631

600632
/***
@@ -603,9 +635,69 @@ public <E extends Exception> void runStep(
603635
* current one) will not execute
604636
*
605637
* @param workflowId ID of the workflow to cancel
638+
* @throws IllegalStateException if called before DBOS is launched
606639
*/
607640
public void cancelWorkflow(@NonNull String workflowId) {
608-
ensureLaunched("cancelWorkflow").cancelWorkflow(workflowId);
641+
cancelWorkflows(List.of(workflowId));
642+
}
643+
644+
/**
645+
* Cancels multiple workflows. After this function is called, the next step (not the current one)
646+
* of each specified workflow will not execute.
647+
*
648+
* @param workflowIds a list of workflow IDs to cancel; must not be null
649+
* @throws IllegalStateException if called before DBOS is launched
650+
*/
651+
public void cancelWorkflows(@NonNull List<String> workflowIds) {
652+
ensureLaunched("cancelWorkflow").cancelWorkflows(workflowIds);
653+
}
654+
655+
/**
656+
* Delete a workflow from the system. This permanently removes the workflow and its associated
657+
* data from the database. Child workflows are preserved by default.
658+
*
659+
* @param workflowId ID of the workflow to delete; must not be null
660+
* @throws IllegalStateException if called before DBOS is launched
661+
*/
662+
public void deleteWorkflow(@NonNull String workflowId) {
663+
deleteWorkflows(List.of(workflowId), false);
664+
}
665+
666+
/**
667+
* Delete a workflow from the system. This permanently removes the workflow and its associated
668+
* data from the database.
669+
*
670+
* @param workflowId ID of the workflow to delete; must not be null
671+
* @param deleteChildren if true, also delete any child workflows; if false, preserve child
672+
* workflows
673+
* @throws IllegalStateException if called before DBOS is launched
674+
*/
675+
public void deleteWorkflow(@NonNull String workflowId, boolean deleteChildren) {
676+
deleteWorkflows(List.of(workflowId), deleteChildren);
677+
}
678+
679+
/**
680+
* Delete multiple workflows from the system. This permanently removes the workflows and their
681+
* associated data from the database. Child workflows are preserved by default.
682+
*
683+
* @param workflowIds a list of workflow IDs to delete; must not be null
684+
* @throws IllegalStateException if called before DBOS is launched
685+
*/
686+
public void deleteWorkflows(@NonNull List<String> workflowIds) {
687+
deleteWorkflows(workflowIds, false);
688+
}
689+
690+
/**
691+
* Delete multiple workflows from the system. This permanently removes the workflows and their
692+
* associated data from the database.
693+
*
694+
* @param workflowIds a list of workflow IDs to delete; must not be null
695+
* @param deleteChildren if true, also delete any child workflows; if false, preserve child
696+
* workflows
697+
* @throws IllegalStateException if called before DBOS is launched
698+
*/
699+
public void deleteWorkflows(@NonNull List<String> workflowIds, boolean deleteChildren) {
700+
ensureLaunched("deleteWorkflows").deleteWorkflows(workflowIds, deleteChildren);
609701
}
610702

611703
/**
@@ -639,28 +731,6 @@ public void cancelWorkflow(@NonNull String workflowId) {
639731
return forkWorkflow(workflowId, startStep, new ForkOptions());
640732
}
641733

642-
/**
643-
* Deletes a workflow from the system. Does not delete child workflows.
644-
*
645-
* @param workflowId the unique identifier of the workflow to delete. Must not be null.
646-
* @throws IllegalArgumentException if workflowId is null
647-
*/
648-
public void deleteWorkflow(@NonNull String workflowId) {
649-
deleteWorkflow(workflowId, false);
650-
}
651-
652-
/**
653-
* Deletes a workflow and optionally its child workflows from the system.
654-
*
655-
* @param workflowId the unique identifier of the workflow to delete. Must not be null.
656-
* @param deleteChildren if true, also deletes all child workflows associated with the specified
657-
* workflow; if false, only deletes the specified workflow
658-
* @throws IllegalArgumentException if workflowId is null
659-
*/
660-
public void deleteWorkflow(@NonNull String workflowId, boolean deleteChildren) {
661-
ensureLaunched("deleteWorkflow").deleteWorkflow(workflowId, deleteChildren);
662-
}
663-
664734
/**
665735
* Retrieve a handle to a workflow, given its ID. Note that a handle is always returned, whether
666736
* the workflow exists or not; getStatus() can be used to tell the difference

transact/src/main/java/dev/dbos/transact/DBOSClient.java

Lines changed: 71 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -660,7 +660,7 @@ public void send(
660660
*/
661661
public <T, E extends Exception> @NonNull WorkflowHandle<T, E> retrieveWorkflow(
662662
@NonNull String workflowId) {
663-
return new WorkflowHandleClient<T, E>(workflowId);
663+
return new WorkflowHandleClient<>(workflowId);
664664
}
665665

666666
/**
@@ -669,7 +669,17 @@ public void send(
669669
* @param workflowId ID of the workflow to cancel
670670
*/
671671
public void cancelWorkflow(@NonNull String workflowId) {
672-
systemDatabase.cancelWorkflow(workflowId);
672+
systemDatabase.cancelWorkflows(List.of(workflowId));
673+
}
674+
675+
/**
676+
* Cancel multiple workflows. After this function is called, the next step (not the current one)
677+
* of each specified workflow will not execute.
678+
*
679+
* @param workflowIds a list of workflow IDs to cancel; must not be null
680+
*/
681+
public void cancelWorkflows(@NonNull List<String> workflowIds) {
682+
systemDatabase.cancelWorkflows(workflowIds);
673683
}
674684

675685
/**
@@ -682,10 +692,68 @@ public void cancelWorkflow(@NonNull String workflowId) {
682692
*/
683693
public <T, E extends Exception> @NonNull WorkflowHandle<T, E> resumeWorkflow(
684694
@NonNull String workflowId) {
685-
systemDatabase.resumeWorkflow(workflowId);
695+
systemDatabase.resumeWorkflows(List.of(workflowId));
686696
return retrieveWorkflow(workflowId);
687697
}
688698

699+
/**
700+
* Resume multiple workflows starting from the step after the last complete step for each
701+
* workflow. This method allows bulk resumption of workflows that were previously interrupted or
702+
* failed.
703+
*
704+
* @param workflowIds a list of workflow IDs to resume; must not be null
705+
* @return A list of handles to the resumed workflows
706+
*/
707+
public @NonNull List<WorkflowHandle<Object, Exception>> resumeWorkflows(
708+
@NonNull List<String> workflowIds) {
709+
systemDatabase.resumeWorkflows(workflowIds);
710+
return workflowIds.stream().map(this::retrieveWorkflow).toList();
711+
}
712+
713+
/**
714+
* Delete a workflow from the system. This permanently removes the workflow and its associated
715+
* data from the database. Child workflows are preserved by default.
716+
*
717+
* @param workflowId ID of the workflow to delete; must not be null
718+
*/
719+
public void deleteWorkflow(@NonNull String workflowId) {
720+
deleteWorkflows(List.of(workflowId), false);
721+
}
722+
723+
/**
724+
* Delete a workflow from the system. This permanently removes the workflow and its associated
725+
* data from the database.
726+
*
727+
* @param workflowId ID of the workflow to delete; must not be null
728+
* @param deleteChildren if true, also delete any child workflows; if false, preserve child
729+
* workflows
730+
*/
731+
public void deleteWorkflow(@NonNull String workflowId, boolean deleteChildren) {
732+
deleteWorkflows(List.of(workflowId), deleteChildren);
733+
}
734+
735+
/**
736+
* Delete multiple workflows from the system. This permanently removes the workflows and their
737+
* associated data from the database. Child workflows are preserved by default.
738+
*
739+
* @param workflowIds a list of workflow IDs to delete; must not be null
740+
*/
741+
public void deleteWorkflows(@NonNull List<String> workflowIds) {
742+
deleteWorkflows(workflowIds, false);
743+
}
744+
745+
/**
746+
* Delete multiple workflows from the system. This permanently removes the workflows and their
747+
* associated data from the database.
748+
*
749+
* @param workflowIds a list of workflow IDs to delete; must not be null
750+
* @param deleteChildren if true, also delete any child workflows; if false, preserve child
751+
* workflows
752+
*/
753+
public void deleteWorkflows(@NonNull List<String> workflowIds, boolean deleteChildren) {
754+
systemDatabase.deleteWorkflows(workflowIds, deleteChildren);
755+
}
756+
689757
/**
690758
* Fork a workflow, providing a handle to the new workflow
691759
*

transact/src/main/java/dev/dbos/transact/admin/AdminServer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ private void cancel(HttpExchange exchange, String wfid) throws IOException {
230230

231231
logger.info("cancel workflow {}", wfid);
232232

233-
dbosExecutor.cancelWorkflow(wfid);
233+
dbosExecutor.cancelWorkflows(List.of(wfid));
234234
exchange.sendResponseHeaders(204, 0);
235235
}
236236

@@ -239,7 +239,7 @@ private void resume(HttpExchange exchange, String wfid) throws IOException {
239239

240240
logger.info("resume workflow {}", wfid);
241241

242-
dbosExecutor.resumeWorkflow(wfid);
242+
dbosExecutor.resumeWorkflows(List.of(wfid));
243243
exchange.sendResponseHeaders(204, 0);
244244
}
245245

transact/src/main/java/dev/dbos/transact/conductor/Conductor.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ public void onOpen(WebSocket ws) {
183183

184184
@Override
185185
public CompletableFuture<?> onText(WebSocket ws, CharSequence data, boolean last) {
186-
logger.debug("onText data size {} last {}", data.length(), last);
186+
logger.trace("onText data size {} last {}", data.length(), last);
187187

188188
// Streaming import path: queue each frame for the worker thread (non-blocking)
189189
if (importFrameQueue != null) {
@@ -754,12 +754,15 @@ static CompletableFuture<BaseResponse> handleCancel(Conductor conductor, BaseMes
754754
return CompletableFuture.supplyAsync(
755755
() -> {
756756
CancelRequest request = (CancelRequest) message;
757+
List<String> ids =
758+
(request.workflow_ids != null && !request.workflow_ids.isEmpty())
759+
? request.workflow_ids
760+
: List.of(request.workflow_id);
757761
try {
758-
conductor.dbosExecutor.cancelWorkflow(request.workflow_id);
762+
conductor.dbosExecutor.cancelWorkflows(ids);
759763
return new SuccessResponse(request, true);
760764
} catch (Exception e) {
761-
logger.error(
762-
"Exception encountered when cancelling workflow {}", request.workflow_id, e);
765+
logger.error("Exception encountered when cancelling workflow(s) {}", ids, e);
763766
return new SuccessResponse(request, e);
764767
}
765768
});
@@ -769,11 +772,15 @@ static CompletableFuture<BaseResponse> handleDelete(Conductor conductor, BaseMes
769772
return CompletableFuture.supplyAsync(
770773
() -> {
771774
DeleteRequest request = (DeleteRequest) message;
775+
List<String> ids =
776+
(request.workflow_ids != null && !request.workflow_ids.isEmpty())
777+
? request.workflow_ids
778+
: List.of(request.workflow_id);
772779
try {
773-
conductor.dbosExecutor.deleteWorkflow(request.workflow_id, request.delete_children);
780+
conductor.systemDatabase.deleteWorkflows(ids, request.delete_children);
774781
return new SuccessResponse(request, true);
775782
} catch (Exception e) {
776-
logger.error("Exception encountered when deleting workflow {}", request.workflow_id, e);
783+
logger.error("Exception encountered when deleting workflow(s) {}", ids, e);
777784
return new SuccessResponse(request, e);
778785
}
779786
});
@@ -783,11 +790,15 @@ static CompletableFuture<BaseResponse> handleResume(Conductor conductor, BaseMes
783790
return CompletableFuture.supplyAsync(
784791
() -> {
785792
ResumeRequest request = (ResumeRequest) message;
793+
List<String> ids =
794+
(request.workflow_ids != null && !request.workflow_ids.isEmpty())
795+
? request.workflow_ids
796+
: List.of(request.workflow_id);
786797
try {
787-
conductor.dbosExecutor.resumeWorkflow(request.workflow_id);
798+
conductor.dbosExecutor.resumeWorkflows(ids);
788799
return new SuccessResponse(request, true);
789800
} catch (Exception e) {
790-
logger.error("Exception encountered when resuming workflow {}", request.workflow_id, e);
801+
logger.error("Exception encountered when resuming workflow(s) {}", ids, e);
791802
return new SuccessResponse(request, e);
792803
}
793804
});

transact/src/main/java/dev/dbos/transact/conductor/protocol/CancelRequest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package dev.dbos.transact.conductor.protocol;
22

3+
import java.util.List;
4+
35
public class CancelRequest extends BaseMessage {
46
public String workflow_id;
7+
public List<String> workflow_ids;
58

69
public CancelRequest() {}
710

transact/src/main/java/dev/dbos/transact/conductor/protocol/DeleteRequest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package dev.dbos.transact.conductor.protocol;
22

3+
import java.util.List;
4+
35
public class DeleteRequest extends BaseMessage {
46
public String workflow_id;
7+
public List<String> workflow_ids;
58
public boolean delete_children;
69

710
public DeleteRequest() {}

transact/src/main/java/dev/dbos/transact/conductor/protocol/ResumeRequest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package dev.dbos.transact.conductor.protocol;
22

3+
import java.util.List;
4+
35
public class ResumeRequest extends BaseMessage {
46
public String workflow_id;
7+
public List<String> workflow_ids;
58

69
public ResumeRequest() {}
710

0 commit comments

Comments
 (0)