Skip to content

Commit 79c2125

Browse files
committed
sync test
1 parent 600ef10 commit 79c2125

3 files changed

Lines changed: 161 additions & 9 deletions

File tree

src/main/java/dev/dbos/transact/execution/DBOSExecutor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -490,13 +490,13 @@ public WorkflowHandle<?> resumeWorkflow(String workflowId) {
490490

491491
public void cancelWorkflow(String workflowId) {
492492

493-
Supplier<Void> resumeFunction = () -> {
493+
Supplier<Void> cancelFunction = () -> {
494494
logger.info("Cancelling workflow: ", workflowId);
495-
systemDatabase.resumeWorkflow(workflowId);
495+
systemDatabase.cancelWorkflow(workflowId);
496496
return null ; // void
497497
};
498498
// Execute the cancel operation as a workflow step
499-
systemDatabase.callFunctionAsStep(resumeFunction, "DBOS.resumeWorkflow");
499+
systemDatabase.callFunctionAsStep(cancelFunction, "DBOS.resumeWorkflow");
500500

501501
}
502502

src/test/java/dev/dbos/transact/workflow/MgmtServiceImpl.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ public class MgmtServiceImpl implements MgmtService{
99

1010
Logger logger = LoggerFactory.getLogger(MgmtServiceImpl.class) ;
1111

12-
private int stepsExecuted ;
12+
private volatile int stepsExecuted ;
1313
CountDownLatch mainThreadEvent ;
1414
CountDownLatch workflowEvent ;
1515

@@ -24,7 +24,7 @@ public MgmtServiceImpl(CountDownLatch mainLatch, CountDownLatch workLatch) {
2424
public void setMgmtService(MgmtService m) {
2525
service = m;
2626
}
27-
27+
2828

2929
@Workflow(name = "myworkflow")
3030
public int simpleWorkflow(int input) {
@@ -44,21 +44,21 @@ public int simpleWorkflow(int input) {
4444
}
4545

4646
@Step(name = "one")
47-
public void stepOne() {
47+
public synchronized void stepOne() {
4848
++stepsExecuted;
4949
}
5050

5151
@Step(name = "two")
52-
public void stepTwo() {
52+
public synchronized void stepTwo() {
5353
++stepsExecuted;
5454
}
5555

5656
@Step(name = "three")
57-
public void stepThree() {
57+
public synchronized void stepThree() {
5858
++stepsExecuted;
5959
}
6060

61-
public int getStepsExecuted() {
61+
public synchronized int getStepsExecuted() {
6262
return stepsExecuted ;
6363
}
6464

src/test/java/dev/dbos/transact/workflow/WorkflowMgmtTest.java

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55
import dev.dbos.transact.context.DBOSOptions;
66
import dev.dbos.transact.context.SetDBOSOptions;
77
import dev.dbos.transact.database.SystemDatabase;
8+
import dev.dbos.transact.exceptions.WorkflowCancelledException;
89
import dev.dbos.transact.execution.DBOSExecutor;
10+
import dev.dbos.transact.execution.ExecutingService;
11+
import dev.dbos.transact.queue.Queue;
912
import dev.dbos.transact.utils.DBUtils;
1013
import org.junit.jupiter.api.AfterEach;
1114
import org.junit.jupiter.api.BeforeAll;
@@ -20,8 +23,11 @@
2023
import java.sql.SQLException;
2124
import java.sql.Statement;
2225
import java.util.concurrent.CountDownLatch;
26+
import java.util.concurrent.ExecutorService;
27+
import java.util.concurrent.Executors;
2328

2429
import static org.junit.jupiter.api.Assertions.assertEquals;
30+
import static org.junit.jupiter.api.Assertions.assertTrue;
2531

2632
public class WorkflowMgmtTest {
2733

@@ -103,14 +109,160 @@ public void asyncCancelResumeTest() throws Exception {
103109
workLatch.countDown();
104110

105111
assertEquals(1, mgmtService.getStepsExecuted()) ;
112+
WorkflowHandle h = dbosExecutor.retrieveWorkflow(workflowId) ;
113+
assertEquals(WorkflowState.CANCELLED.name(), h.getStatus().getStatus());
106114

107115
WorkflowHandle<?> handle = dbos.resumeWorkflow(workflowId) ;
108116

109117
result = (Integer) handle.getResult() ;
110118
assertEquals(23, result);
111119
assertEquals(3, mgmtService.getStepsExecuted()) ;
112120

121+
// resume again
122+
123+
handle = dbos.resumeWorkflow(workflowId) ;
124+
125+
result = (Integer) handle.getResult() ;
126+
assertEquals(23, result);
127+
assertEquals(3, mgmtService.getStepsExecuted()) ;
128+
129+
logger.info("Test completed");
130+
131+
}
132+
133+
@Test
134+
public void queuedCancelResumeTest() throws Exception {
135+
136+
CountDownLatch mainLatch = new CountDownLatch(1);
137+
CountDownLatch workLatch = new CountDownLatch(1);
138+
139+
MgmtService mgmtService = dbos.<MgmtService>Workflow()
140+
.interfaceClass(MgmtService.class)
141+
.implementation(new MgmtServiceImpl(mainLatch, workLatch))
142+
.build();
143+
mgmtService.setMgmtService(mgmtService);
144+
145+
Queue myqueue = new DBOS.QueueBuilder("myqueue").build();
146+
147+
String workflowId = "wfid1" ;
148+
DBOSOptions options = new DBOSOptions.Builder(workflowId).queue(myqueue).build();
149+
int result ;
150+
try (SetDBOSOptions o = new SetDBOSOptions(options)) {
151+
mgmtService.simpleWorkflow(23);
152+
}
153+
154+
mainLatch.await();
155+
dbos.cancelWorkflow(workflowId);
156+
workLatch.countDown();
157+
158+
assertEquals(1, mgmtService.getStepsExecuted()) ;
159+
WorkflowHandle h = dbosExecutor.retrieveWorkflow(workflowId) ;
160+
assertEquals(WorkflowState.CANCELLED.name(), h.getStatus().getStatus());
161+
162+
WorkflowHandle<?> handle = dbos.resumeWorkflow(workflowId) ;
163+
164+
result = (Integer) handle.getResult() ;
165+
assertEquals(23, result);
166+
assertEquals(3, mgmtService.getStepsExecuted()) ;
167+
168+
// resume again
169+
170+
handle = dbos.resumeWorkflow(workflowId) ;
171+
172+
result = (Integer) handle.getResult() ;
173+
assertEquals(23, result);
174+
assertEquals(3, mgmtService.getStepsExecuted()) ;
175+
113176
logger.info("Test completed");
114177

115178
}
179+
180+
181+
@Test
182+
public void syncCancelResumeTest() throws Exception {
183+
184+
CountDownLatch mainLatch = new CountDownLatch(1);
185+
CountDownLatch workLatch = new CountDownLatch(1);
186+
187+
MgmtService mgmtService = dbos.<MgmtService>Workflow()
188+
.interfaceClass(MgmtService.class)
189+
.implementation(new MgmtServiceImpl(mainLatch, workLatch))
190+
.build();
191+
mgmtService.setMgmtService(mgmtService);
192+
193+
194+
ExecutorService e = Executors.newFixedThreadPool(2);
195+
String workflowId = "wfid1" ;
196+
197+
CountDownLatch testLatch = new CountDownLatch(2) ;
198+
199+
e.submit(() -> {
200+
201+
DBOSOptions options = new DBOSOptions.Builder(workflowId).build();
202+
203+
try {
204+
try (SetDBOSOptions o = new SetDBOSOptions(options)) {
205+
mgmtService.simpleWorkflow(23);
206+
}
207+
} catch(Throwable t) {
208+
logger.info("caught ex");
209+
// assertTrue(t instanceof WorkflowCancelledException) ;
210+
}
211+
212+
assertEquals(1, mgmtService.getStepsExecuted()) ;
213+
logger.info("counting down latch") ;
214+
testLatch.countDown();
215+
logger.info("exiting thread 1") ;
216+
}) ;
217+
218+
219+
220+
// mainLatch.await();
221+
// dbos.cancelWorkflow(workflowId);
222+
// workLatch.countDown();
223+
224+
225+
226+
e.submit(() -> {
227+
try {
228+
mainLatch.await();
229+
dbos.cancelWorkflow(workflowId);
230+
workLatch.countDown();
231+
232+
logger.info("2 counting down lotch") ;
233+
testLatch.countDown();
234+
logger.info("2 after counting down lotch") ;
235+
236+
237+
} catch(InterruptedException ie) {
238+
logger.error(ie.toString()) ;
239+
}
240+
logger.info("Exiting thread 2") ;
241+
}) ;
242+
243+
logger.info("Before test latch");
244+
testLatch.await();
245+
logger.info("after test latch");
246+
247+
248+
249+
WorkflowHandle<?> handle = dbos.resumeWorkflow(workflowId) ;
250+
251+
int result = (Integer) handle.getResult() ;
252+
assertEquals(23, result);
253+
assertEquals(3, mgmtService.getStepsExecuted()) ;
254+
255+
// resume again
256+
257+
handle = dbos.resumeWorkflow(workflowId) ;
258+
259+
result = (Integer) handle.getResult() ;
260+
assertEquals(23, result);
261+
assertEquals(3, mgmtService.getStepsExecuted()) ;
262+
263+
logger.info("Test completed");
264+
265+
}
266+
267+
116268
}

0 commit comments

Comments
 (0)