Skip to content

Commit dee7b87

Browse files
committed
Test withDeduplicationId forwarding and harden debouncer recovery test
1 parent fa57962 commit dee7b87

1 file changed

Lines changed: 91 additions & 9 deletions

File tree

transact/src/test/java/dev/dbos/transact/workflow/DebouncerTest.java

Lines changed: 91 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,23 @@ public interface DebouncedService {
4646
public static class DebouncedServiceImpl implements DebouncedService {
4747
private final AtomicInteger callCount = new AtomicInteger();
4848
private final ConcurrentLinkedQueue<String> callArgs = new ConcurrentLinkedQueue<>();
49+
// When set, the workflow blocks here while running so tests can inspect its in-flight status.
50+
volatile CountDownLatch gate;
4951

5052
@Override
5153
@Workflow
5254
public String process(String input) {
5355
callCount.incrementAndGet();
5456
callArgs.add(input);
57+
if (gate != null) {
58+
try {
59+
// Ceiling only; the test counts the gate down as soon as it has observed the status.
60+
// Must exceed the observation window so the workflow stays in-flight until then.
61+
gate.await(60, TimeUnit.SECONDS);
62+
} catch (InterruptedException e) {
63+
Thread.currentThread().interrupt();
64+
}
65+
}
5566
return "result:" + input;
5667
}
5768

@@ -381,32 +392,103 @@ public void recoveryDoesNotRestartUserWorkflow() throws Exception {
381392

382393
// Simulate a crash where the debouncer ran but did not durably record completion: flip only
383394
// the debouncer workflow back to PENDING (the user workflow stays SUCCESS) and recover it.
395+
// The debouncer finishes asynchronously after starting the user workflow, so wait until it is
396+
// SUCCESS before flipping — otherwise the flip would race its own completion.
384397
var executor = DBOSTestAccess.getDbosExecutor(dbos);
385-
markDebouncerPending();
398+
awaitDebouncerFlippedToPending(Duration.ofSeconds(30));
386399

387400
var recovered = executor.recoverPendingWorkflows(List.of(executor.executorId()));
388401
assertEquals(1, recovered.size());
389402
for (var h : recovered) {
390403
h.getResult();
391404
}
392405

393-
// Replay reused the same user workflow id and did not run the user workflow again.
406+
// Replay reused the same user workflow id and did not run the user workflow again. The count
407+
// check is independent of timing: a second user workflow would create a row at enqueue/start
408+
// time, before it could execute, so it would be caught even if its body had not run yet.
409+
assertEquals(1, countWorkflowsByName("process"));
394410
assertEquals(1, serviceImpl.callCount());
395411
assertEquals(List.of("v1"), serviceImpl.callArgs());
396412
WorkflowHandle<String, Exception> userHandle = dbos.retrieveWorkflow(userWorkflowId);
397413
assertEquals("result:v1", userHandle.getResult());
398414
assertEquals(WorkflowState.SUCCESS, userHandle.getStatus().status());
399415
}
400416

401-
private void markDebouncerPending() throws SQLException {
402-
var sql =
403-
"UPDATE dbos.workflow_status SET status = ?, queue_name = NULL, updated_at = ? WHERE name = ?";
417+
private int countWorkflowsByName(String name) throws SQLException {
418+
var sql = "SELECT count(*) FROM dbos.workflow_status WHERE name = ?";
404419
try (Connection conn = pgContainer.dataSource().getConnection();
405420
PreparedStatement stmt = conn.prepareStatement(sql)) {
406-
stmt.setString(1, WorkflowState.PENDING.name());
407-
stmt.setLong(2, Instant.now().toEpochMilli());
408-
stmt.setString(3, Constants.DEBOUNCER_WORKFLOW_NAME);
409-
stmt.executeUpdate();
421+
stmt.setString(1, name);
422+
try (var rs = stmt.executeQuery()) {
423+
rs.next();
424+
return rs.getInt(1);
425+
}
426+
}
427+
}
428+
429+
// withDeduplicationId must forward the id to the queued user workflow.
430+
@Test
431+
public void deduplicationIdForwardedToQueuedUserWorkflow() throws Exception {
432+
DebouncedService svc = dbos.registerProxy(DebouncedService.class, serviceImpl);
433+
Queue userQueue = new Queue("dedup-user-queue");
434+
dbos.registerQueue(userQueue);
435+
serviceImpl.gate = new CountDownLatch(1);
436+
dbos.launch();
437+
438+
String dedupId = "user-dedup-1";
439+
var handle =
440+
dbos.<String>debouncer()
441+
.withQueue(userQueue)
442+
.withDeduplicationId(dedupId)
443+
.debounce("dd-key", Duration.ofMillis(300), () -> svc.process("v1"));
444+
445+
// The user workflow blocks on the gate while running, so its deduplication_id is still set
446+
// (it is cleared only on completion). Wait for it to appear, then assert it was forwarded.
447+
String observed = awaitDeduplicationId(handle, Duration.ofSeconds(30));
448+
assertEquals(dedupId, observed);
449+
450+
serviceImpl.gate.countDown();
451+
assertEquals("result:v1", handle.getResult());
452+
assertEquals(1, serviceImpl.callCount());
453+
}
454+
455+
private String awaitDeduplicationId(WorkflowHandle<String, ?> handle, Duration timeout)
456+
throws InterruptedException {
457+
long deadline = System.currentTimeMillis() + timeout.toMillis();
458+
while (System.currentTimeMillis() < deadline) {
459+
try {
460+
var status = handle.getStatus();
461+
if (status != null && status.deduplicationId() != null) {
462+
return status.deduplicationId();
463+
}
464+
} catch (RuntimeException ignored) {
465+
// status row not present yet
466+
}
467+
Thread.sleep(50);
468+
}
469+
throw new AssertionError("user workflow deduplicationId not observed within timeout");
470+
}
471+
472+
// Flip the (completed) debouncer workflow back to PENDING, retrying until it has reached SUCCESS
473+
// so the result is deterministic regardless of how the debouncer's async completion interleaves.
474+
private void awaitDebouncerFlippedToPending(Duration timeout) throws Exception {
475+
var sql =
476+
"UPDATE dbos.workflow_status SET status = ?, queue_name = NULL, updated_at = ?"
477+
+ " WHERE name = ? AND status = ?";
478+
long deadline = System.currentTimeMillis() + timeout.toMillis();
479+
while (System.currentTimeMillis() < deadline) {
480+
try (Connection conn = pgContainer.dataSource().getConnection();
481+
PreparedStatement stmt = conn.prepareStatement(sql)) {
482+
stmt.setString(1, WorkflowState.PENDING.name());
483+
stmt.setLong(2, Instant.now().toEpochMilli());
484+
stmt.setString(3, Constants.DEBOUNCER_WORKFLOW_NAME);
485+
stmt.setString(4, WorkflowState.SUCCESS.name());
486+
if (stmt.executeUpdate() == 1) {
487+
return;
488+
}
489+
}
490+
Thread.sleep(50);
410491
}
492+
throw new AssertionError("debouncer workflow did not reach SUCCESS within timeout");
411493
}
412494
}

0 commit comments

Comments
 (0)