Skip to content

Commit 600ef10

Browse files
committed
first cancel/resume test
1 parent 9a6dbc9 commit 600ef10

7 files changed

Lines changed: 242 additions & 6 deletions

File tree

src/main/java/dev/dbos/transact/DBOS.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,8 +343,27 @@ public void sleep(float seconds) {
343343
this.dbosExecutor.sleep(seconds) ;
344344
}
345345

346+
/**
347+
*
348+
* Resume a workflow starting from the step after the last complete step
349+
*
350+
* @param workflowId id of the workflow
351+
* @return A handle to the workflow
352+
*/
346353
public WorkflowHandle<?> resumeWorkflow(String workflowId) {
347354
return this.dbosExecutor.resumeWorkflow(workflowId) ;
348355
}
356+
357+
/***
358+
*
359+
* Cancel the workflow. After this function is called, the next step (not the current one)
360+
* will not execute
361+
*
362+
* @param workflowId
363+
*/
364+
365+
public void cancelWorkflow(String workflowId) {
366+
this.dbosExecutor.cancelWorkflow(workflowId);
367+
}
349368
}
350369

src/main/java/dev/dbos/transact/database/WorkflowDAO.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,6 @@ public void recordWorkflowError(String workflowId, String error) {
358358
options.setError(error);
359359
options.setResetDeduplicationID(true);
360360

361-
362361
updateWorkflowStatus(connection, workflowId, WorkflowState.ERROR.toString(), options);
363362

364363
}
@@ -754,7 +753,7 @@ private void updateWorkflowToEnqueued(Connection connection, String workflowId)
754753
" SET status = ?, " +
755754
" queue_name = ?, " +
756755
" recovery_attempts = ?, " +
757-
" workflow_deadline_epoch_ms = NULL, " +
756+
" workflow_deadline_epoch_ms = 0, " +
758757
" deduplication_id = NULL, " +
759758
" started_at_epoch_ms = NULL " +
760759
" WHERE workflow_uuid = ? " ;

src/main/java/dev/dbos/transact/execution/DBOSExecutor.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -363,17 +363,22 @@ public <T> T runStep(String stepName,
363363

364364
if (recordedResult != null) {
365365

366+
logger.info("There is an recorded result for step " + stepFunctionId) ;
367+
366368
String output = recordedResult.getOutput() ;
367369
if (output != null) {
370+
logger.info("Result has an output") ;
368371
Object[] stepO = JSONUtil.deserializeToArray(output);
369-
return(T) stepO[0];
372+
return stepO == null ? null : (T) stepO[0];
370373
}
371374

372375
String error = recordedResult.getError();
373376
if (error != null) {
374377
// TODO: fix deserialization of errors
375378
throw new Exception(error);
376379
}
380+
} else {
381+
logger.info("There is an No recorded result for step " + stepFunctionId) ;
377382
}
378383

379384
int currAttempts = 1 ;
@@ -383,6 +388,8 @@ public <T> T runStep(String stepName,
383388

384389
while (retriedAllowed && currAttempts <= maxAttempts) {
385390

391+
logger.info("curr attempt " + currAttempts) ;
392+
386393
try {
387394
result = function.execute();
388395
serializedOutput = JSONUtil.serialize(result);
@@ -470,9 +477,9 @@ public void sleep(float seconds) {
470477
}
471478

472479
public WorkflowHandle<?> resumeWorkflow(String workflowId) {
473-
// Define the function to be executed as a step
480+
474481
Supplier<Void> resumeFunction = () -> {
475-
logger.info("Resuming workflow: {}", workflowId);
482+
logger.info("Resuming workflow: ", workflowId);
476483
systemDatabase.resumeWorkflow(workflowId);
477484
return null ; // void
478485
};
@@ -481,4 +488,16 @@ public WorkflowHandle<?> resumeWorkflow(String workflowId) {
481488
return retrieveWorkflow(workflowId);
482489
}
483490

491+
public void cancelWorkflow(String workflowId) {
492+
493+
Supplier<Void> resumeFunction = () -> {
494+
logger.info("Cancelling workflow: ", workflowId);
495+
systemDatabase.resumeWorkflow(workflowId);
496+
return null ; // void
497+
};
498+
// Execute the cancel operation as a workflow step
499+
systemDatabase.callFunctionAsStep(resumeFunction, "DBOS.resumeWorkflow");
500+
501+
}
502+
484503
}

src/main/java/dev/dbos/transact/queue/QueueService.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public class QueueService {
2525
private QueueRegistry queueRegistry ;
2626
private CountDownLatch shutdownLatch;
2727

28-
// private final Queue internalQueue = new DBOS.QueueBuilder(Constants.DBOS_INTERNAL_QUEUE).build() ;
28+
private Queue internalQueue ;
2929

3030
public QueueService(SystemDatabase systemDatabase) {
3131
this.systemDatabase = systemDatabase ;
@@ -44,6 +44,8 @@ public void register(Queue queue) {
4444
private void pollForWorkflows() {
4545
logger.info("PollQueuesThread started ...." + Thread.currentThread().getId()) ;
4646

47+
internalQueue = new DBOS.QueueBuilder(Constants.DBOS_INTERNAL_QUEUE).build() ;
48+
4749
double pollingInterval = 1.0 ;
4850
double minPollingInterval = 1.0 ;
4951
double maxPollingInterval = 120.0 ;
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package dev.dbos.transact.workflow;
2+
3+
import java.util.concurrent.CountDownLatch;
4+
5+
public interface MgmtService {
6+
7+
void setMgmtService(MgmtService m);
8+
int simpleWorkflow(int input) ;
9+
void stepOne() ;
10+
void stepTwo() ;
11+
void stepThree() ;
12+
13+
int getStepsExecuted() ;
14+
15+
16+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package dev.dbos.transact.workflow;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
6+
import java.util.concurrent.CountDownLatch;
7+
8+
public class MgmtServiceImpl implements MgmtService{
9+
10+
Logger logger = LoggerFactory.getLogger(MgmtServiceImpl.class) ;
11+
12+
private int stepsExecuted ;
13+
CountDownLatch mainThreadEvent ;
14+
CountDownLatch workflowEvent ;
15+
16+
MgmtService service ;
17+
18+
public MgmtServiceImpl(CountDownLatch mainLatch, CountDownLatch workLatch) {
19+
this.mainThreadEvent = mainLatch ;
20+
this.workflowEvent = workLatch ;
21+
22+
}
23+
24+
public void setMgmtService(MgmtService m) {
25+
service = m;
26+
}
27+
28+
29+
@Workflow(name = "myworkflow")
30+
public int simpleWorkflow(int input) {
31+
32+
try {
33+
service.stepOne();
34+
mainThreadEvent.countDown();
35+
workflowEvent.await();
36+
service.stepTwo();
37+
service.stepThree();
38+
39+
} catch (InterruptedException e) {
40+
logger.error(e.getMessage());
41+
}
42+
43+
return input;
44+
}
45+
46+
@Step(name = "one")
47+
public void stepOne() {
48+
++stepsExecuted;
49+
}
50+
51+
@Step(name = "two")
52+
public void stepTwo() {
53+
++stepsExecuted;
54+
}
55+
56+
@Step(name = "three")
57+
public void stepThree() {
58+
++stepsExecuted;
59+
}
60+
61+
public int getStepsExecuted() {
62+
return stepsExecuted ;
63+
}
64+
65+
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package dev.dbos.transact.workflow;
2+
3+
import dev.dbos.transact.DBOS;
4+
import dev.dbos.transact.config.DBOSConfig;
5+
import dev.dbos.transact.context.DBOSOptions;
6+
import dev.dbos.transact.context.SetDBOSOptions;
7+
import dev.dbos.transact.database.SystemDatabase;
8+
import dev.dbos.transact.execution.DBOSExecutor;
9+
import dev.dbos.transact.utils.DBUtils;
10+
import org.junit.jupiter.api.AfterEach;
11+
import org.junit.jupiter.api.BeforeAll;
12+
import org.junit.jupiter.api.BeforeEach;
13+
import org.junit.jupiter.api.Test;
14+
import org.slf4j.Logger;
15+
import org.slf4j.LoggerFactory;
16+
17+
import javax.sql.DataSource;
18+
import java.sql.Connection;
19+
import java.sql.DriverManager;
20+
import java.sql.SQLException;
21+
import java.sql.Statement;
22+
import java.util.concurrent.CountDownLatch;
23+
24+
import static org.junit.jupiter.api.Assertions.assertEquals;
25+
26+
public class WorkflowMgmtTest {
27+
28+
Logger logger = LoggerFactory.getLogger(WorkflowMgmtTest.class);
29+
30+
private static DBOSConfig dbosConfig;
31+
private static DataSource dataSource ;
32+
private DBOS dbos ;
33+
private static SystemDatabase systemDatabase ;
34+
private DBOSExecutor dbosExecutor;
35+
36+
@BeforeAll
37+
static void onetimeSetup() throws Exception {
38+
39+
WorkflowMgmtTest.dbosConfig = new DBOSConfig
40+
.Builder()
41+
.name("systemdbtest")
42+
.dbHost("localhost")
43+
.dbPort(5432)
44+
.dbUser("postgres")
45+
.sysDbName("dbos_java_sys")
46+
.maximumPoolSize(2)
47+
.build();
48+
49+
String dbUrl = String.format("jdbc:postgresql://%s:%d/%s", dbosConfig.getDbHost(), dbosConfig.getDbPort(), "postgres");
50+
51+
String sysDb = dbosConfig.getSysDbName();
52+
try (Connection conn = DriverManager.getConnection(dbUrl, dbosConfig.getDbUser(), dbosConfig.getDbPassword());
53+
Statement stmt = conn.createStatement()) {
54+
55+
56+
String dropDbSql = String.format("DROP DATABASE IF EXISTS %s", sysDb);
57+
String createDbSql = String.format("CREATE DATABASE %s", sysDb);
58+
stmt.execute(dropDbSql);
59+
stmt.execute(createDbSql);
60+
}
61+
62+
}
63+
64+
@BeforeEach
65+
void beforeEachTest() throws SQLException {
66+
WorkflowMgmtTest.dataSource = DBUtils.createDataSource(dbosConfig) ;
67+
DBOS.initialize(dbosConfig);
68+
dbos = DBOS.getInstance();
69+
SystemDatabase.initialize(dataSource);
70+
systemDatabase = SystemDatabase.getInstance();
71+
dbosExecutor = new DBOSExecutor(dbosConfig, systemDatabase);
72+
dbos.setDbosExecutor(dbosExecutor);
73+
dbos.launch();
74+
DBUtils.clearTables(dataSource);
75+
}
76+
77+
@AfterEach
78+
void afterEachTest() throws SQLException {
79+
dbos.shutdown();
80+
}
81+
82+
@Test
83+
public void asyncCancelResumeTest() throws Exception {
84+
85+
CountDownLatch mainLatch = new CountDownLatch(1);
86+
CountDownLatch workLatch = new CountDownLatch(1);
87+
88+
MgmtService mgmtService = dbos.<MgmtService>Workflow()
89+
.interfaceClass(MgmtService.class)
90+
.implementation(new MgmtServiceImpl(mainLatch, workLatch))
91+
.build();
92+
mgmtService.setMgmtService(mgmtService);
93+
94+
String workflowId = "wfid1" ;
95+
DBOSOptions options = new DBOSOptions.Builder(workflowId).async().build();
96+
int result ;
97+
try (SetDBOSOptions o = new SetDBOSOptions(options)) {
98+
mgmtService.simpleWorkflow(23);
99+
}
100+
101+
mainLatch.await();
102+
dbos.cancelWorkflow(workflowId);
103+
workLatch.countDown();
104+
105+
assertEquals(1, mgmtService.getStepsExecuted()) ;
106+
107+
WorkflowHandle<?> handle = dbos.resumeWorkflow(workflowId) ;
108+
109+
result = (Integer) handle.getResult() ;
110+
assertEquals(23, result);
111+
assertEquals(3, mgmtService.getStepsExecuted()) ;
112+
113+
logger.info("Test completed");
114+
115+
}
116+
}

0 commit comments

Comments
 (0)