Skip to content

Commit 0e51c0b

Browse files
committed
more tests
1 parent 3330cd7 commit 0e51c0b

3 files changed

Lines changed: 813 additions & 78 deletions

File tree

transact/src/test/java/dev/dbos/transact/queue/DynamicQueuesTest.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,11 @@ public void testDeleteAndRecreateQueue() throws Exception {
212212
dbos.deleteQueue("q-lifecycle");
213213
assertFalse(dbos.listQueues().stream().anyMatch(x -> x.name().equals("q-lifecycle")));
214214

215+
// Wait for the old listener to detect the deletion and remove itself from the
216+
// active-listener set. Without this wait the supervisor may not start a fresh
217+
// listener for the recreated queue (dbListeningQueues still contains the name).
218+
Thread.sleep(500);
219+
215220
// Recreate with different config.
216221
dbos.registerQueue("q-lifecycle", QueueOptions.setConcurrency(2));
217222
var recreated =
@@ -251,6 +256,51 @@ public void testStaticAndDynamicQueueSameName() throws Exception {
251256
assertEquals(WorkflowState.SUCCESS, handle.getStatus().status());
252257
}
253258

259+
@Test
260+
public void testDynamicConcurrencyTakesEffect() throws Exception {
261+
ConcurrencyTestServiceImpl impl = new ConcurrencyTestServiceImpl();
262+
ConcurrencyTestService service = dbos.registerProxy(ConcurrencyTestService.class, impl);
263+
dbos.launch();
264+
265+
var qs = DBOSTestAccess.getQueueService(dbos);
266+
qs.setSpeedupForTest();
267+
268+
// Start with concurrency=1 so only one workflow dequeues at a time.
269+
dbos.registerQueue("dyn-update-q", QueueOptions.setConcurrency(1));
270+
271+
var h1 =
272+
dbos.startWorkflow(
273+
() -> service.blockedWorkflow(0),
274+
new StartWorkflowOptions("dyn-wf1").withQueue("dyn-update-q"));
275+
var h2 =
276+
dbos.startWorkflow(
277+
() -> service.blockedWorkflow(1),
278+
new StartWorkflowOptions("dyn-wf2").withQueue("dyn-update-q"));
279+
var h3 =
280+
dbos.startWorkflow(
281+
() -> service.blockedWorkflow(2),
282+
new StartWorkflowOptions("dyn-wf3").withQueue("dyn-update-q"));
283+
284+
// Wait for exactly one workflow to be dequeued and start running.
285+
impl.wfSemaphore.acquire(1);
286+
287+
// With concurrency=1 the other two should still be waiting.
288+
Thread.sleep(200);
289+
assertEquals(WorkflowState.ENQUEUED, h2.getStatus().status());
290+
assertEquals(WorkflowState.ENQUEUED, h3.getStatus().status());
291+
292+
// Bump concurrency. The runner reloads queue settings on its next poll and
293+
// should immediately dequeue the remaining two workflows.
294+
dbos.updateQueue("dyn-update-q", QueueOptions.setConcurrency(3));
295+
impl.wfSemaphore.acquire(2);
296+
297+
// Release all blocked workflows and verify they complete successfully.
298+
impl.latch.countDown();
299+
assertEquals(0, h1.getResult());
300+
assertEquals(1, h2.getResult());
301+
assertEquals(2, h3.getResult());
302+
}
303+
254304
@Test
255305
public void testRegisterQueueValidation() throws Exception {
256306
dbos.launch();

0 commit comments

Comments
 (0)