Skip to content

Commit 3717de4

Browse files
authored
fix and re-enable testQueueConcurrencyUnderRecovery (#45)
Fixes #44
1 parent b1fc0e0 commit 3717de4

7 files changed

Lines changed: 31 additions & 103 deletions

File tree

src/main/java/dev/dbos/transact/interceptor/BaseInvocationHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ protected Object handleWorkflow(Method method, Object[] args, Workflow workflow)
5050

5151
String workflowName = workflow.name().isEmpty() ? method.getName() : workflow.name();
5252

53-
String msg = String.format("Before: Starting workflow '%s' (timeout: %ds)%n",
53+
String msg = String.format("Before: Starting workflow '%s' (timeout: %ds)",
5454
workflowName,
5555
workflow.timeout());
5656

src/main/java/dev/dbos/transact/queue/QueueService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ private void pollForWorkflows() {
9898

9999
} finally {
100100
shutdownLatch.countDown();
101-
logger.info("QueuesPolThread has ended. Exiting " + Thread.currentThread().getId());
101+
logger.info("QueuesPollThread has ended. Exiting " + Thread.currentThread().getId());
102102
}
103103
}
104104

src/test/java/dev/dbos/transact/conductor/TestWebSocketServer.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package dev.dbos.transact.conductor;
22

3-
import dev.dbos.transact.utils.ManualResetEvent;
4-
53
import java.net.InetSocketAddress;
4+
import java.util.concurrent.Semaphore;
5+
import java.util.concurrent.TimeUnit;
66

77
import org.java_websocket.WebSocket;
88
import org.java_websocket.framing.Framedata;
@@ -38,7 +38,7 @@ default void onClose(WebSocket conn, int code, String reason, boolean remote) {
3838

3939
private static Logger logger = LoggerFactory.getLogger(TestWebSocketServer.class);
4040
private WebSocketTestListener listener;
41-
private ManualResetEvent startEvent = new ManualResetEvent(false);
41+
private Semaphore startEvent = new Semaphore(0);
4242

4343
public TestWebSocketServer(int port) {
4444
super(new InetSocketAddress(port));
@@ -48,12 +48,8 @@ public void setListener(WebSocketTestListener listener) {
4848
this.listener = listener;
4949
}
5050

51-
public void waitStart() throws InterruptedException {
52-
startEvent.waitOne();
53-
}
54-
5551
public boolean waitStart(long millis) throws InterruptedException {
56-
return startEvent.waitOne(millis);
52+
return startEvent.tryAcquire(millis, TimeUnit.MILLISECONDS);
5753
}
5854

5955
@Override
@@ -66,7 +62,7 @@ public void onOpen(WebSocket conn, ClientHandshake handshake) {
6662

6763
@Override
6864
public void onClose(WebSocket conn, int code, String reason, boolean remote) {
69-
startEvent.reset();
65+
startEvent.drainPermits();
7066
logger.info("onClose");
7167
if (listener != null) {
7268
listener.onClose(conn, code, reason, remote);
@@ -91,7 +87,7 @@ public void onWebsocketPing(WebSocket conn, Framedata f) {
9187

9288
@Override
9389
public void onError(WebSocket conn, Exception ex) {
94-
startEvent.reset();
90+
startEvent.drainPermits();
9591
logger.error("onError", ex);
9692
if (listener != null) {
9793
listener.onError(conn, ex);
@@ -100,7 +96,7 @@ public void onError(WebSocket conn, Exception ex) {
10096

10197
@Override
10298
public void onStart() {
103-
startEvent.set();
99+
startEvent.release();
104100
logger.info("onStart {}", getPort());
105101
if (listener != null) {
106102
listener.onStart();

src/test/java/dev/dbos/transact/queue/ConcurrencyTestService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,5 @@
33
public interface ConcurrencyTestService {
44
public int noopWorkflow(int i);
55

6-
public int blockedWorkflow(int i);
6+
public int blockedWorkflow(int i) throws InterruptedException;
77
}
Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,20 @@
11
package dev.dbos.transact.queue;
22

3-
import dev.dbos.transact.utils.ManualResetEvent;
43
import dev.dbos.transact.workflow.Workflow;
54

65
import java.util.List;
6+
import java.util.concurrent.CountDownLatch;
7+
import java.util.concurrent.Semaphore;
8+
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
711

812
public class ConcurrencyTestServiceImpl implements ConcurrencyTestService {
913

10-
public ManualResetEvent event = new ManualResetEvent(false);
11-
public List<ManualResetEvent> wfEvents = List.of(new ManualResetEvent(false), new ManualResetEvent(false));
14+
private static final Logger logger = LoggerFactory.getLogger(ConcurrencyTestServiceImpl.class);
15+
16+
public CountDownLatch latch = new CountDownLatch(1);
17+
public List<Semaphore> wfSemaphores = List.of(new Semaphore(0), new Semaphore(0));
1218
public int counter = 0;
1319

1420
@Workflow(name = "noopWorkflow")
@@ -17,11 +23,11 @@ public int noopWorkflow(int i) {
1723
}
1824

1925
@Workflow(name = "blockedWorkflow")
20-
public int blockedWorkflow(int i) {
21-
wfEvents.get(i).set();
26+
public int blockedWorkflow(int i) throws InterruptedException {
27+
logger.info("release {} semaphore", i);
28+
wfSemaphores.get(i).release();
2229
counter++;
23-
event.waitOne();
30+
latch.await();
2431
return i;
2532
}
26-
2733
}

src/test/java/dev/dbos/transact/queue/QueuesTest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import dev.dbos.transact.database.SystemDatabase;
1313
import dev.dbos.transact.execution.DBOSExecutor;
1414
import dev.dbos.transact.utils.DBUtils;
15-
import dev.dbos.transact.utils.ManualResetEvent;
1615
import dev.dbos.transact.workflow.WorkflowHandle;
1716
import dev.dbos.transact.workflow.WorkflowState;
1817
import dev.dbos.transact.workflow.WorkflowStatus;
@@ -26,6 +25,7 @@
2625
import java.time.temporal.ChronoUnit;
2726
import java.util.ArrayList;
2827
import java.util.List;
28+
import java.util.concurrent.Semaphore;
2929

3030
import javax.sql.DataSource;
3131

@@ -514,9 +514,9 @@ public void testQueueConcurrencyUnderRecovery() throws Exception {
514514
handle3 = dbos.startWorkflow(() -> service.noopWorkflow(2));
515515
}
516516

517-
for (ManualResetEvent e : impl.wfEvents) {
518-
e.waitOne();
519-
e.reset();
517+
for (Semaphore e : impl.wfSemaphores) {
518+
e.acquire();
519+
e.drainPermits();
520520
}
521521

522522
assertEquals(2, impl.counter);
@@ -551,22 +551,22 @@ public void testQueueConcurrencyUnderRecovery() throws Exception {
551551
assertTrue(expectedWorkflowIds.contains(localHandles.get(0).getWorkflowId()));
552552
assertTrue(expectedWorkflowIds.contains(localHandles.get(1).getWorkflowId()));
553553

554-
for (ManualResetEvent e : impl.wfEvents) {
555-
e.waitOne();
554+
for (int i = 0; i < impl.wfSemaphores.size(); i++) {
555+
logger.info("acquire {} semaphore", i);
556+
impl.wfSemaphores.get(i).acquire();
556557
}
557558

558559
assertEquals(4, impl.counter);
559560
assertEquals(WorkflowState.PENDING.toString(), handle1.getStatus().getStatus());
560561
assertEquals(WorkflowState.PENDING.toString(), handle2.getStatus().getStatus());
561562
assertEquals(WorkflowState.ENQUEUED.toString(), handle3.getStatus().getStatus());
562563

563-
impl.event.set();
564+
impl.latch.countDown();
564565
assertEquals(0, handle1.getResult());
565566
assertEquals(1, handle2.getResult());
566567
assertEquals(2, handle3.getResult());
567568
assertEquals("local", handle3.getStatus().getExecutorId());
568569

569570
assertTrue(DBUtils.queueEntriesAreCleanedUp(dataSource));
570-
571571
}
572572
}

src/test/java/dev/dbos/transact/utils/ManualResetEvent.java

Lines changed: 0 additions & 74 deletions
This file was deleted.

0 commit comments

Comments
 (0)