Skip to content

Commit 30b9c32

Browse files
authored
DBOSExecutor refactoring (#355)
The primary purpose of this PR is to move proxy workflow/step execution from `DBOSInvocationHandler` to `DBOSExecutor` so that it can be leveraged by alternative proxy infrastructure (like Spring Boot). However, I took the opportunity to cleanup and refactor several aspects of `DBOSExecutor` to make future changes easier. > Note, the only test changes were due to internal method signature changes * reordered `DBOSExecutor` methods to group similar functionality together * added `DBOSExecutor.runStep` to be invoked by `DBOS.runStep` and `DBOSInvocationHandler.invoke` * renamed `callFunctionAsStep` to `runDbosFunctionAsStep` to better indicate its usage * `runStepInternal` is now private and is used by both `runStep` and `runDbosFunctionAsStep`, eliminating some duplicate code * moved invocation hook declarations and types into DBOSExecutor (Invocation and hookHolder ThreadLocal) * moved WorkflowHandleFuture.getResult implementation into DBOSExecutor * removed StepOptions.retriesAllowed and strongly typed retryInterval * assorted renames for clarity * Changed SystemDatabase.getPendingWorkflows to use listWorkflows under the hood. method now returns WorkflowStatus so I dropped GetPendingWorkflowsOutput
1 parent 55a39da commit 30b9c32

18 files changed

Lines changed: 832 additions & 1063 deletions

transact/src/main/java/dev/dbos/transact/DBOS.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -568,7 +568,7 @@ public void setEvent(
568568
public <T, E extends Exception> T runStep(
569569
@NonNull ThrowingSupplier<T, E> stepfunc, @NonNull StepOptions opts) throws E {
570570

571-
return ensureLaunched("runStep").runStepInternal(stepfunc, opts, null);
571+
return ensureLaunched("runStep").runStep(stepfunc, opts, null);
572572
}
573573

574574
/**

transact/src/main/java/dev/dbos/transact/DBOSClient.java

Lines changed: 29 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import dev.dbos.transact.database.StreamIterator;
88
import dev.dbos.transact.database.SystemDatabase;
99
import dev.dbos.transact.execution.DBOSExecutor;
10+
import dev.dbos.transact.execution.ExecutionOptions;
1011
import dev.dbos.transact.json.DBOSSerializer;
1112
import dev.dbos.transact.json.PortableWorkflowException;
1213
import dev.dbos.transact.json.SerializationUtil;
@@ -537,33 +538,34 @@ public EnqueueOptions(
537538
}
538539

539540
var workflowId =
540-
DBOSExecutor.enqueueWorkflow(
541-
options.workflowName(),
542-
options.className(),
543-
options.instanceName(),
544-
null,
545-
positionalArgs,
546-
namedArgs,
547-
new DBOSExecutor.ExecutionOptions(
548-
Objects.requireNonNullElseGet(
549-
options.workflowId(), () -> UUID.randomUUID().toString()),
550-
Timeout.of(options.timeout()),
551-
options.deadline,
552-
options.queueName(),
553-
options.deduplicationId,
554-
options.priority,
555-
options.queuePartitionKey,
556-
options.delay,
557-
options.appVersion,
558-
false,
559-
false,
560-
serializationFormat),
561-
null,
562-
null,
563-
null,
564-
null,
565-
systemDatabase,
566-
this.serializer);
541+
Objects.requireNonNullElseGet(options.workflowId(), () -> UUID.randomUUID().toString());
542+
543+
DBOSExecutor.enqueueWorkflow(
544+
options.workflowName(),
545+
options.className(),
546+
options.instanceName(),
547+
null,
548+
positionalArgs,
549+
namedArgs,
550+
new ExecutionOptions(
551+
workflowId,
552+
Timeout.of(options.timeout()),
553+
options.deadline,
554+
options.queueName(),
555+
options.deduplicationId,
556+
options.priority,
557+
options.queuePartitionKey,
558+
options.delay,
559+
options.appVersion,
560+
false,
561+
false,
562+
serializationFormat),
563+
null,
564+
null,
565+
null,
566+
null,
567+
systemDatabase,
568+
this.serializer);
567569

568570
return new WorkflowHandleClient<>(workflowId);
569571
}

transact/src/main/java/dev/dbos/transact/conductor/Conductor.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import dev.dbos.transact.workflow.WorkflowHandle;
1212
import dev.dbos.transact.workflow.WorkflowSchedule;
1313
import dev.dbos.transact.workflow.WorkflowStatus;
14-
import dev.dbos.transact.workflow.internal.GetPendingWorkflowsOutput;
1514

1615
import java.io.ByteArrayInputStream;
1716
import java.io.ByteArrayOutputStream;
@@ -920,9 +919,9 @@ static CompletableFuture<BaseResponse> handleExistPendingWorkflows(
920919
() -> {
921920
ExistPendingWorkflowsRequest request = (ExistPendingWorkflowsRequest) message;
922921
try {
923-
List<GetPendingWorkflowsOutput> pending =
922+
var pending =
924923
conductor.systemDatabase.getPendingWorkflows(
925-
request.executor_id, request.application_version);
924+
List.of(request.executor_id), request.application_version);
926925
return new ExistPendingWorkflowsResponse(request, !pending.isEmpty());
927926
} catch (Exception e) {
928927
logger.error("Exception encountered when checking for pending workflows", e);

transact/src/main/java/dev/dbos/transact/context/DBOSContextHolder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ public class DBOSContextHolder {
99
ThreadLocal.withInitial(DBOSContext::new);
1010

1111
public static @NonNull DBOSContext get() {
12-
return contextHolder.get();
12+
// contextHolder is never null. Even if cleared, it gets set back to the initial value.
13+
return Objects.requireNonNull(contextHolder.get());
1314
}
1415

1516
public static void clear() {

transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import dev.dbos.transact.workflow.WorkflowSchedule;
2323
import dev.dbos.transact.workflow.WorkflowStatus;
2424
import dev.dbos.transact.workflow.WorkflowStream;
25-
import dev.dbos.transact.workflow.internal.GetPendingWorkflowsOutput;
2625
import dev.dbos.transact.workflow.internal.StepResult;
2726
import dev.dbos.transact.workflow.internal.WorkflowStatusInternal;
2827

@@ -303,8 +302,8 @@ public List<WorkflowAggregateRow> getWorkflowAggregates(GetWorkflowAggregatesInp
303302
return dbRetry(() -> workflowDAO.getWorkflowAggregates(input));
304303
}
305304

306-
public List<GetPendingWorkflowsOutput> getPendingWorkflows(String executorId, String appVersion) {
307-
return dbRetry(() -> workflowDAO.getPendingWorkflows(executorId, appVersion));
305+
public List<WorkflowStatus> getPendingWorkflows(List<String> executorIds, String appVersion) {
306+
return dbRetry(() -> workflowDAO.getPendingWorkflows(executorIds, appVersion));
308307
}
309308

310309
public boolean clearQueueAssignment(String workflowId) {

transact/src/main/java/dev/dbos/transact/database/WorkflowDAO.java

Lines changed: 7 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import dev.dbos.transact.workflow.WorkflowDelay;
2020
import dev.dbos.transact.workflow.WorkflowState;
2121
import dev.dbos.transact.workflow.WorkflowStatus;
22-
import dev.dbos.transact.workflow.internal.GetPendingWorkflowsOutput;
2322
import dev.dbos.transact.workflow.internal.StepResult;
2423
import dev.dbos.transact.workflow.internal.WorkflowStatusInternal;
2524

@@ -808,38 +807,14 @@ private static WorkflowStatus resultsToWorkflowStatus(
808807
return info;
809808
}
810809

811-
List<GetPendingWorkflowsOutput> getPendingWorkflows(String executorId, String appVersion)
810+
List<WorkflowStatus> getPendingWorkflows(List<String> executorIds, String appVersion)
812811
throws SQLException {
813-
814-
final String sql =
815-
"""
816-
SELECT workflow_uuid, queue_name
817-
FROM "%s".workflow_status
818-
WHERE status = ?
819-
AND executor_id = ?
820-
AND application_version = ?
821-
"""
822-
.formatted(this.schema);
823-
824-
List<GetPendingWorkflowsOutput> results = new ArrayList<>();
825-
826-
try (Connection connection = dataSource.getConnection();
827-
PreparedStatement stmt = connection.prepareStatement(sql)) {
828-
829-
stmt.setString(1, WorkflowState.PENDING.name());
830-
stmt.setString(2, executorId);
831-
stmt.setString(3, appVersion);
832-
833-
try (ResultSet rs = stmt.executeQuery()) {
834-
while (rs.next()) {
835-
results.add(
836-
new GetPendingWorkflowsOutput(
837-
rs.getString("workflow_uuid"), rs.getString("queue_name")));
838-
}
839-
}
840-
}
841-
842-
return results;
812+
var input =
813+
new ListWorkflowsInput()
814+
.withStatus(WorkflowState.PENDING)
815+
.withExecutorIds(executorIds)
816+
.withApplicationVersion(appVersion);
817+
return listWorkflows(input);
843818
}
844819

845820
@SuppressWarnings("unchecked")

0 commit comments

Comments
 (0)