|
5 | 5 | import static org.junit.jupiter.api.Assertions.assertNotNull; |
6 | 6 | import static org.junit.jupiter.api.Assertions.assertTrue; |
7 | 7 |
|
| 8 | +import dev.dbos.transact.Constants; |
8 | 9 | import dev.dbos.transact.DBOS; |
| 10 | +import dev.dbos.transact.DBOSTestAccess; |
9 | 11 | import dev.dbos.transact.config.DBOSConfig; |
10 | 12 | import dev.dbos.transact.utils.PgContainer; |
11 | 13 |
|
| 14 | +import java.sql.Connection; |
| 15 | +import java.sql.PreparedStatement; |
| 16 | +import java.sql.SQLException; |
12 | 17 | import java.time.Duration; |
| 18 | +import java.time.Instant; |
13 | 19 | import java.util.List; |
14 | 20 | import java.util.concurrent.ConcurrentLinkedQueue; |
15 | 21 | import java.util.concurrent.CountDownLatch; |
@@ -359,4 +365,48 @@ public void reDebounceAfterWindowCloses() throws Exception { |
359 | 365 | // Each window produces an independent user workflow. |
360 | 366 | assertNotEquals(h1.workflowId(), h2.workflowId()); |
361 | 367 | } |
| 368 | + |
| 369 | + // Recovering/replaying the internal debouncer workflow must be idempotent: it reuses the |
| 370 | + // pre-assigned user workflow id and must not start a second user workflow execution. |
| 371 | + @Test |
| 372 | + public void recoveryDoesNotRestartUserWorkflow() throws Exception { |
| 373 | + DebouncedService svc = dbos.registerProxy(DebouncedService.class, serviceImpl); |
| 374 | + dbos.launch(); |
| 375 | + |
| 376 | + var handle = |
| 377 | + dbos.<String>debouncer().debounce("rec-key", Duration.ofMillis(300), () -> svc.process("v1")); |
| 378 | + String userWorkflowId = handle.workflowId(); |
| 379 | + assertEquals("result:v1", handle.getResult()); |
| 380 | + assertEquals(1, serviceImpl.callCount()); |
| 381 | + |
| 382 | + // Simulate a crash where the debouncer ran but did not durably record completion: flip only |
| 383 | + // the debouncer workflow back to PENDING (the user workflow stays SUCCESS) and recover it. |
| 384 | + var executor = DBOSTestAccess.getDbosExecutor(dbos); |
| 385 | + markDebouncerPending(); |
| 386 | + |
| 387 | + var recovered = executor.recoverPendingWorkflows(List.of(executor.executorId())); |
| 388 | + assertEquals(1, recovered.size()); |
| 389 | + for (var h : recovered) { |
| 390 | + h.getResult(); |
| 391 | + } |
| 392 | + |
| 393 | + // Replay reused the same user workflow id and did not run the user workflow again. |
| 394 | + assertEquals(1, serviceImpl.callCount()); |
| 395 | + assertEquals(List.of("v1"), serviceImpl.callArgs()); |
| 396 | + WorkflowHandle<String, Exception> userHandle = dbos.retrieveWorkflow(userWorkflowId); |
| 397 | + assertEquals("result:v1", userHandle.getResult()); |
| 398 | + assertEquals(WorkflowState.SUCCESS, userHandle.getStatus().status()); |
| 399 | + } |
| 400 | + |
| 401 | + private void markDebouncerPending() throws SQLException { |
| 402 | + var sql = |
| 403 | + "UPDATE dbos.workflow_status SET status = ?, queue_name = NULL, updated_at = ? WHERE name = ?"; |
| 404 | + try (Connection conn = pgContainer.dataSource().getConnection(); |
| 405 | + PreparedStatement stmt = conn.prepareStatement(sql)) { |
| 406 | + stmt.setString(1, WorkflowState.PENDING.name()); |
| 407 | + stmt.setLong(2, Instant.now().toEpochMilli()); |
| 408 | + stmt.setString(3, Constants.DEBOUNCER_WORKFLOW_NAME); |
| 409 | + stmt.executeUpdate(); |
| 410 | + } |
| 411 | + } |
362 | 412 | } |
0 commit comments