Skip to content

Commit 9a6dbc9

Browse files
committed
resume first drop
1 parent 55b2a70 commit 9a6dbc9

17 files changed

Lines changed: 249 additions & 60 deletions

File tree

src/main/java/dev/dbos/transact/Constants.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,6 @@ public class Constants {
1212
public static final String DEFAULT_EXECUTORID = "local";
1313

1414
public static final String DBOS_NULL_TOPIC = "__null__topic__" ;
15+
16+
public static final String DBOS_INTERNAL_QUEUE = "_dbos_internal_queue" ;
1517
}

src/main/java/dev/dbos/transact/DBOS.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ public void shutdown() {
268268
}
269269
}
270270

271-
public static WorkflowHandle retrieveWorkflow(String workflowId) {
271+
public static WorkflowHandle<?> retrieveWorkflow(String workflowId) {
272272
return DBOS.getInstance().dbosExecutor.retrieveWorkflow(workflowId);
273273
}
274274

@@ -342,5 +342,9 @@ public void sleep(float seconds) {
342342

343343
this.dbosExecutor.sleep(seconds) ;
344344
}
345+
346+
public WorkflowHandle<?> resumeWorkflow(String workflowId) {
347+
return this.dbosExecutor.resumeWorkflow(workflowId) ;
348+
}
345349
}
346350

src/main/java/dev/dbos/transact/database/StepsDAO.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ public void recordStepResultTxn(StepResult result, Connection connection) throws
6969
}
7070
}
7171

72-
7372
}
7473

7574

src/main/java/dev/dbos/transact/database/SystemDatabase.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44
import com.zaxxer.hikari.HikariDataSource;
55
import dev.dbos.transact.Constants;
66
import dev.dbos.transact.config.DBOSConfig;
7+
import dev.dbos.transact.context.DBOSContext;
8+
import dev.dbos.transact.context.DBOSContextHolder;
79
import dev.dbos.transact.exceptions.*;
10+
import dev.dbos.transact.json.JSONUtil;
811
import dev.dbos.transact.notifications.GetWorkflowEventContext;
912
import dev.dbos.transact.notifications.NotificationService;
1013
import dev.dbos.transact.queue.Queue;
@@ -21,6 +24,7 @@
2124
import javax.sql.DataSource;
2225
import java.sql.*;
2326
import java.util.*;
27+
import java.util.function.Supplier;
2428

2529
import static dev.dbos.transact.exceptions.ErrorCode.UNEXPECTED;
2630

@@ -302,6 +306,83 @@ public void cancelWorkflow(String workflowId) {
302306

303307
}
304308

309+
public void resumeWorkflow(String workflowId) {
310+
try {
311+
workflowDAO.resumeWorkflow(workflowId);
312+
} catch (SQLException s) {
313+
throw new DBOSException(ErrorCode.RESUME_WORKFLOW_ERROR.getCode(), s.getMessage()) ;
314+
}
315+
316+
}
317+
318+
319+
public <T> T callFunctionAsStep(Supplier<T> fn, String functionName) {
320+
DBOSContext ctx = DBOSContextHolder.get();
321+
322+
int nextFuncId = 0 ;
323+
324+
if (ctx != null && ctx.isInWorkflow()) {
325+
nextFuncId = ctx.getAndIncrementFunctionId() ;
326+
327+
StepResult result = null ;
328+
329+
try (Connection connection = dataSource.getConnection()) {
330+
result = stepsDAO.checkStepExecutionTxn(
331+
ctx.getWorkflowId(), nextFuncId, functionName, connection
332+
);
333+
} catch(SQLException e) {
334+
throw new DBOSException(UNEXPECTED.getCode(), "Function execution failed: " + functionName, e);
335+
}
336+
337+
if (result != null) {
338+
return handleExistingResult(result, functionName);
339+
}
340+
}
341+
342+
T functionResult;
343+
try {
344+
345+
try {
346+
functionResult = fn.get();
347+
} catch (Exception e) {
348+
if (ctx != null && ctx.isInWorkflow()) {
349+
// recordOperationError(ctx, functionName, e);
350+
String jsonError = JSONUtil.serializeError(e);
351+
StepResult r = new StepResult(ctx.getWorkflowId(), nextFuncId, functionName, null, jsonError);
352+
stepsDAO.recordStepResultTxn(r);
353+
}
354+
throw new DBOSException(UNEXPECTED.getCode(), "Function execution failed: " + functionName, e);
355+
}
356+
357+
// If we're in a workflow, record the successful result
358+
if (ctx != null && ctx.isInWorkflow()) {
359+
String jsonOutput = JSONUtil.serialize(functionResult);
360+
StepResult o = new StepResult(ctx.getWorkflowId(), nextFuncId, functionName, jsonOutput, null);
361+
stepsDAO.recordStepResultTxn(o);
362+
}
363+
} catch(SQLException sq) {
364+
throw new DBOSException(UNEXPECTED.getCode(), "Function execution failed: " + functionName, sq);
365+
}
366+
367+
return functionResult;
368+
}
369+
370+
@SuppressWarnings("unchecked")
371+
private <T> T handleExistingResult(StepResult result, String functionName) {
372+
if (result.getOutput() != null) {
373+
Object[] resArray = JSONUtil.deserializeToArray(result.getOutput());
374+
return resArray == null ? null : (T) resArray[0];
375+
} else if (result.getError() != null) {
376+
Object[] eArray = JSONUtil.deserializeToArray(result.getError());
377+
SerializableException se = (SerializableException) eArray[0];
378+
throw new DBOSAppException(String.format("Exception of type %s", se.className), se) ;
379+
} else {
380+
throw new IllegalStateException(
381+
String.format("Recorded output and error are both null for %s", functionName)
382+
);
383+
}
384+
}
385+
305386
private void createDataSource(String dbName) {
306387
HikariConfig hikariConfig = new HikariConfig();
307388

src/main/java/dev/dbos/transact/database/WorkflowDAO.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -700,4 +700,74 @@ public void cancelWorkflow(String workflowId) throws SQLException {
700700
}
701701
}
702702

703+
public void resumeWorkflow(String workflowId) throws SQLException {
704+
705+
try (Connection connection = dataSource.getConnection()) {
706+
connection.setAutoCommit(false);
707+
connection.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
708+
709+
try {
710+
String currentStatus = getWorkflowStatus(connection, workflowId);
711+
712+
713+
if (currentStatus == null) {
714+
connection.rollback();
715+
return;
716+
}
717+
718+
// If workflow is already complete, do nothing
719+
if (WorkflowState.SUCCESS.name().equals(currentStatus) ||
720+
WorkflowState.ERROR.name().equals(currentStatus)) {
721+
connection.rollback();
722+
return;
723+
}
724+
725+
// Set the workflow's status to ENQUEUED and clear recovery fields
726+
updateWorkflowToEnqueued(connection, workflowId);
727+
728+
connection.commit();
729+
730+
} catch (SQLException e) {
731+
connection.rollback();
732+
throw e;
733+
}
734+
}
735+
}
736+
737+
private String getWorkflowStatus(Connection connection, String workflowId) throws SQLException {
738+
String sql = "SELECT status FROM dbos.workflow_status WHERE workflow_uuid = ?";
739+
740+
try (PreparedStatement stmt = connection.prepareStatement(sql)) {
741+
stmt.setString(1, workflowId);
742+
743+
try (ResultSet rs = stmt.executeQuery()) {
744+
if (rs.next()) {
745+
return rs.getString("status");
746+
}
747+
return null;
748+
}
749+
}
750+
}
751+
752+
private void updateWorkflowToEnqueued(Connection connection, String workflowId) throws SQLException {
753+
String sql = "UPDATE dbos.workflow_status " +
754+
" SET status = ?, " +
755+
" queue_name = ?, " +
756+
" recovery_attempts = ?, " +
757+
" workflow_deadline_epoch_ms = NULL, " +
758+
" deduplication_id = NULL, " +
759+
" started_at_epoch_ms = NULL " +
760+
" WHERE workflow_uuid = ? " ;
761+
762+
763+
try (PreparedStatement stmt = connection.prepareStatement(sql)) {
764+
stmt.setString(1, WorkflowState.ENQUEUED.name());
765+
stmt.setString(2, Constants.DBOS_INTERNAL_QUEUE);
766+
stmt.setInt(3, 0); // recovery_attempts = 0
767+
stmt.setString(4, workflowId);
768+
769+
stmt.executeUpdate();
770+
}
771+
}
772+
703773
}

src/main/java/dev/dbos/transact/exceptions/ErrorCode.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ public enum ErrorCode {
1111
WORKFLOW_CANCELLED(7),
1212
UNEXPECTED_STEP(8),
1313
WORKFLOW_FUNCTION_NOT_FOUND(9),
14-
SLEEP_NOT_IN_WORKFLOW(10);
14+
SLEEP_NOT_IN_WORKFLOW(10),
15+
RESUME_WORKFLOW_ERROR(11);
1516

1617
private int code ;
1718

src/main/java/dev/dbos/transact/execution/DBOSExecutor.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@
2424

2525
import java.lang.reflect.InvocationTargetException;
2626
import java.lang.reflect.Method;
27+
import java.sql.SQLException;
2728
import java.util.UUID;
2829
import java.util.concurrent.*;
30+
import java.util.function.Supplier;
2931

3032
import static dev.dbos.transact.exceptions.ErrorCode.UNEXPECTED;
3133

@@ -414,7 +416,7 @@ public <T> T runStep(String stepName,
414416
* Retrieve the workflowHandle for the workflowId
415417
*
416418
*/
417-
public WorkflowHandle retrieveWorkflow(String workflowId) {
419+
public WorkflowHandle<?> retrieveWorkflow(String workflowId) {
418420
return new WorkflowHandleDBPoll(workflowId, systemDatabase) ;
419421
}
420422

@@ -467,4 +469,16 @@ public void sleep(float seconds) {
467469

468470
}
469471

472+
public WorkflowHandle<?> resumeWorkflow(String workflowId) {
473+
// Define the function to be executed as a step
474+
Supplier<Void> resumeFunction = () -> {
475+
logger.info("Resuming workflow: {}", workflowId);
476+
systemDatabase.resumeWorkflow(workflowId);
477+
return null ; // void
478+
};
479+
// Execute the resume operation as a workflow step
480+
systemDatabase.callFunctionAsStep(resumeFunction, "DBOS.resumeWorkflow");
481+
return retrieveWorkflow(workflowId);
482+
}
483+
470484
}

src/main/java/dev/dbos/transact/json/JSONUtil.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import com.fasterxml.jackson.databind.jsontype.BasicPolymorphicTypeValidator;
1111
import com.fasterxml.jackson.databind.jsontype.PolymorphicTypeValidator;
1212
import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
13+
import dev.dbos.transact.exceptions.SerializableException;
1314

1415
import java.lang.reflect.Type;
1516

@@ -54,8 +55,19 @@ public static Object[] deserializeToArray(String json) {
5455
}
5556
}
5657

58+
public static String serializeError(Throwable error) {
5759

58-
public static <T> T deserialize(String json, Class<T> clazz) {
60+
SerializableException se = new SerializableException(error);
61+
return JSONUtil.serialize(se) ;
62+
}
63+
64+
public static SerializableException deserializeError(String json) {
65+
Object[] eArray = JSONUtil.deserializeToArray(json);
66+
return (SerializableException) eArray[0];
67+
}
68+
69+
70+
/* public static <T> T deserialize(String json, Class<T> clazz) {
5971
try {
6072
return mapper.readValue(json, clazz);
6173
} catch (JsonProcessingException e) {
@@ -69,7 +81,7 @@ public static <T> T deserialize(String json, TypeReference<T> typeRef) {
6981
} catch (JsonProcessingException e) {
7082
throw new RuntimeException("Deserialization failed", e);
7183
}
72-
}
84+
} */
7385

7486
}
7587

src/main/java/dev/dbos/transact/queue/QueueService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package dev.dbos.transact.queue;
22

33
import dev.dbos.transact.Constants;
4+
import dev.dbos.transact.DBOS;
45
import dev.dbos.transact.database.SystemDatabase;
56
import dev.dbos.transact.execution.DBOSExecutor;
67
import org.slf4j.Logger;
@@ -24,6 +25,8 @@ public class QueueService {
2425
private QueueRegistry queueRegistry ;
2526
private CountDownLatch shutdownLatch;
2627

28+
// private final Queue internalQueue = new DBOS.QueueBuilder(Constants.DBOS_INTERNAL_QUEUE).build() ;
29+
2730
public QueueService(SystemDatabase systemDatabase) {
2831
this.systemDatabase = systemDatabase ;
2932
queueRegistry = new QueueRegistry();

src/test/java/dev/dbos/transact/notifications/NotificationServiceTest.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,10 @@ public void basic_send_recv() throws Exception {
104104
notService.sendWorkflow(wfid1, "topic1", "HelloDBOS") ;
105105
}
106106

107-
WorkflowHandle<String> handle1 = DBOS.retrieveWorkflow(wfid1);
108-
WorkflowHandle<String> handle2 = DBOS.retrieveWorkflow(wfid2);
107+
WorkflowHandle<?> handle1 = DBOS.retrieveWorkflow(wfid1);
108+
WorkflowHandle<?> handle2 = DBOS.retrieveWorkflow(wfid2);
109109

110-
String result = handle1.getResult();
110+
String result = (String)handle1.getResult();
111111
assertEquals("HelloDBOS", result) ;
112112

113113
assertEquals(WorkflowState.SUCCESS.name(), handle1.getStatus().getStatus());
@@ -155,9 +155,9 @@ public void multiple_send_recv() throws Exception {
155155
}
156156
DBOS.retrieveWorkflow("send3").getResult();
157157

158-
WorkflowHandle<String> handle1 = DBOS.retrieveWorkflow(wfid1);
158+
WorkflowHandle<?> handle1 = DBOS.retrieveWorkflow(wfid1);
159159

160-
String result = handle1.getResult();
160+
String result = (String)handle1.getResult();
161161
assertEquals("Hello1Hello2Hello3", result) ;
162162

163163
assertEquals(WorkflowState.SUCCESS.name(), handle1.getStatus().getStatus());
@@ -185,10 +185,10 @@ public void notopic() throws Exception {
185185
notService.sendWorkflow(wfid1, null, "HelloDBOS") ;
186186
}
187187

188-
WorkflowHandle<String> handle1 = DBOS.retrieveWorkflow(wfid1);
189-
WorkflowHandle<String> handle2 = DBOS.retrieveWorkflow(wfid2);
188+
WorkflowHandle<?> handle1 = DBOS.retrieveWorkflow(wfid1);
189+
WorkflowHandle<?> handle2 = DBOS.retrieveWorkflow(wfid2);
190190

191-
String result = handle1.getResult();
191+
String result = (String)handle1.getResult();
192192
assertEquals("HelloDBOS", result) ;
193193

194194
assertEquals(WorkflowState.SUCCESS.name(), handle1.getStatus().getStatus());
@@ -251,10 +251,10 @@ public void sendNull() throws Exception {
251251
notService.sendWorkflow(wfid1, "topic1", null) ;
252252
}
253253

254-
WorkflowHandle<String> handle1 = DBOS.retrieveWorkflow(wfid1);
255-
WorkflowHandle<String> handle2 = DBOS.retrieveWorkflow(wfid2);
254+
WorkflowHandle<?> handle1 = DBOS.retrieveWorkflow(wfid1);
255+
WorkflowHandle<?> handle2 = DBOS.retrieveWorkflow(wfid2);
256256

257-
String result = handle1.getResult();
257+
String result = (String)handle1.getResult();
258258
assertNull(result) ;
259259

260260
assertEquals(WorkflowState.SUCCESS.name(), handle1.getStatus().getStatus());
@@ -351,10 +351,10 @@ public void recv_sleep() throws Exception {
351351
notService.sendWorkflow(wfid1, "topic1", "HelloDBOS") ;
352352
}
353353

354-
WorkflowHandle<String> handle1 = DBOS.retrieveWorkflow(wfid1);
355-
WorkflowHandle<String> handle2 = DBOS.retrieveWorkflow(wfid2);
354+
WorkflowHandle<?> handle1 = DBOS.retrieveWorkflow(wfid1);
355+
WorkflowHandle<?> handle2 = DBOS.retrieveWorkflow(wfid2);
356356

357-
String result = handle1.getResult();
357+
String result = (String)handle1.getResult();
358358
assertEquals("HelloDBOS", result) ;
359359

360360
assertEquals(WorkflowState.SUCCESS.name(), handle1.getStatus().getStatus());

0 commit comments

Comments
 (0)