diff --git a/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java b/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java index 216cc80f..40be94cb 100644 --- a/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java +++ b/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java @@ -9,6 +9,7 @@ import dev.dbos.transact.workflow.StepInfo; import dev.dbos.transact.workflow.WorkflowHandle; import dev.dbos.transact.workflow.WorkflowSchedule; +import dev.dbos.transact.workflow.WorkflowState; import dev.dbos.transact.workflow.WorkflowStatus; import java.io.ByteArrayInputStream; @@ -567,6 +568,9 @@ void setPingInterval(WebSocket ws) { } }); + if (pingTimeout != null) { + pingTimeout.cancel(false); + } pingTimeout = scheduler.schedule( () -> { @@ -935,8 +939,11 @@ static CompletableFuture handleExistPendingWorkflows( ExistPendingWorkflowsRequest request = (ExistPendingWorkflowsRequest) message; try { var pending = - conductor.systemDatabase.getPendingWorkflows( - List.of(request.executor_id), request.application_version); + conductor.systemDatabase.listWorkflows( + new ListWorkflowsInput() + .withStatus(WorkflowState.PENDING) + .withExecutorIds(List.of(request.executor_id)) + .withApplicationVersion(request.application_version)); return new ExistPendingWorkflowsResponse(request, !pending.isEmpty()); } catch (Exception e) { logger.error("Exception encountered when checking for pending workflows", e); diff --git a/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java b/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java index dfa0483b..b0d100f3 100644 --- a/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java +++ b/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java @@ -373,10 +373,6 @@ public List getWorkflowAggregates(GetWorkflowAggregatesInp return dbRetry(() -> WorkflowDAO.getWorkflowAggregates(ctx, input)); } - public List getPendingWorkflows(List executorIds, String appVersion) { - return dbRetry(() -> WorkflowDAO.getPendingWorkflows(ctx, executorIds, appVersion)); - } - public boolean clearQueueAssignment(String workflowId) { return dbRetry(() -> QueuesDAO.clearQueueAssignment(ctx, workflowId)); } diff --git a/transact/src/main/java/dev/dbos/transact/database/dao/WorkflowDAO.java b/transact/src/main/java/dev/dbos/transact/database/dao/WorkflowDAO.java index 08408d81..42a2f801 100644 --- a/transact/src/main/java/dev/dbos/transact/database/dao/WorkflowDAO.java +++ b/transact/src/main/java/dev/dbos/transact/database/dao/WorkflowDAO.java @@ -822,16 +822,6 @@ private static WorkflowStatus resultsToWorkflowStatus( return info; } - public static List getPendingWorkflows( - DbContext ctx, List executorIds, String appVersion) throws SQLException { - var input = - new ListWorkflowsInput() - .withStatus(WorkflowState.PENDING) - .withExecutorIds(executorIds) - .withApplicationVersion(appVersion); - return listWorkflows(ctx, input); - } - @SuppressWarnings("unchecked") public static Result awaitWorkflowResult( DbContext ctx, Duration dbPollingInterval, String workflowId) throws SQLException { diff --git a/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java b/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java index e5036c12..ba1daa6e 100644 --- a/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java +++ b/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java @@ -235,10 +235,19 @@ public void start( listener.dbosLaunched(dbos); } + var recoveryQuery = + new ListWorkflowsInput() + .withStatus(WorkflowState.PENDING) + .withExecutorIds(List.of(executorId())) + .withApplicationVersion(appVersion) + .withEndTime(Instant.now()); Runnable recoveryTask = () -> { try { - recoverPendingWorkflows(List.of(executorId())); + var workflows = systemDatabase.listWorkflows(recoveryQuery); + for (var wf : workflows) { + recoverWorkflow(wf.workflowId(), wf.queueName()); + } } catch (Throwable t) { logger.error("Recovery task failed", t); } @@ -960,7 +969,12 @@ public boolean deprecatePatch(String patchName) { public List> recoverPendingWorkflows(List executorIds) { Objects.requireNonNull(executorIds); - var workflows = systemDatabase.getPendingWorkflows(executorIds, appVersion); + var input = + new ListWorkflowsInput() + .withStatus(WorkflowState.PENDING) + .withExecutorIds(executorIds) + .withApplicationVersion(appVersion); + var workflows = systemDatabase.listWorkflows(input); return workflows.stream() .map(wf -> recoverWorkflow(wf.workflowId(), wf.queueName())) .collect(Collectors.toList()); diff --git a/transact/src/test/java/dev/dbos/transact/conductor/ConductorTest.java b/transact/src/test/java/dev/dbos/transact/conductor/ConductorTest.java index 1346a5ca..937d7547 100644 --- a/transact/src/test/java/dev/dbos/transact/conductor/ConductorTest.java +++ b/transact/src/test/java/dev/dbos/transact/conductor/ConductorTest.java @@ -158,7 +158,7 @@ public void onClose(WebSocket conn, int code, String reason, boolean remote) { Listener listener = new Listener(); testServer.setListener(listener); - builder.pingPeriodMs(2000).pingTimeoutMs(1000); + builder.pingPeriodMs(2000).pingTimeoutMs(5000); try (Conductor conductor = builder.build()) { conductor.start(); @@ -1538,13 +1538,12 @@ public void canExistPendingWorkflows() throws Exception { testServer.setListener(listener); String executorId = "exec-id"; String appVersion = "app-version"; - var executorIds = List.of(executorId); List outputs = new ArrayList<>(); outputs.add(new WorkflowStatusBuilder("wf-1").build()); outputs.add(new WorkflowStatusBuilder("wf-2").queueName("queue").build()); - when(mockDB.getPendingWorkflows(executorIds, appVersion)).thenReturn(outputs); + when(mockDB.listWorkflows(any(ListWorkflowsInput.class))).thenReturn(outputs); try (Conductor conductor = builder.build()) { conductor.start(); @@ -1556,7 +1555,7 @@ public void canExistPendingWorkflows() throws Exception { listener.send(MessageType.EXIST_PENDING_WORKFLOWS, "12345", message); assertTrue(listener.messageLatch.await(1, TimeUnit.SECONDS), "message latch timed out"); - verify(mockDB).getPendingWorkflows(executorIds, appVersion); + verify(mockDB).listWorkflows(any(ListWorkflowsInput.class)); JsonNode jsonNode = mapper.readTree(listener.message); assertNotNull(jsonNode); @@ -1572,9 +1571,8 @@ public void canExistPendingWorkflowsFalse() throws Exception { testServer.setListener(listener); String executorId = "exec-id"; String appVersion = "app-version"; - var executorIds = List.of(executorId); - when(mockDB.getPendingWorkflows(executorIds, appVersion)).thenReturn(List.of()); + when(mockDB.listWorkflows(any(ListWorkflowsInput.class))).thenReturn(List.of()); try (Conductor conductor = builder.build()) { conductor.start(); @@ -1592,7 +1590,7 @@ public void canExistPendingWorkflowsFalse() throws Exception { listener.send(MessageType.EXIST_PENDING_WORKFLOWS, "12345", message); assertTrue(listener.messageLatch.await(1, TimeUnit.SECONDS), "message latch timed out"); - verify(mockDB).getPendingWorkflows(executorIds, appVersion); + verify(mockDB).listWorkflows(any(ListWorkflowsInput.class)); JsonNode jsonNode = mapper.readTree(listener.message); assertNotNull(jsonNode); diff --git a/transact/src/test/java/dev/dbos/transact/execution/DBOSExecutorTest.java b/transact/src/test/java/dev/dbos/transact/execution/DBOSExecutorTest.java index f5b93f30..82eafb13 100644 --- a/transact/src/test/java/dev/dbos/transact/execution/DBOSExecutorTest.java +++ b/transact/src/test/java/dev/dbos/transact/execution/DBOSExecutorTest.java @@ -14,6 +14,7 @@ import dev.dbos.transact.workflow.*; import java.util.List; +import java.util.UUID; import com.zaxxer.hikari.HikariDataSource; import org.junit.jupiter.api.Assumptions; @@ -302,7 +303,7 @@ public void sleepRecovery() throws Exception { dbos.launch(); var dbosExecutor = DBOSTestAccess.getDbosExecutor(dbos); - String wfid = "wf-123"; + String wfid = UUID.randomUUID().toString(); try (var id = new WorkflowOptions(wfid).setContext()) { executingService.sleepingWorkflow(.002f); } diff --git a/transact/src/test/java/dev/dbos/transact/execution/RecoveryServiceTest.java b/transact/src/test/java/dev/dbos/transact/execution/RecoveryServiceTest.java index 635406e3..50814709 100644 --- a/transact/src/test/java/dev/dbos/transact/execution/RecoveryServiceTest.java +++ b/transact/src/test/java/dev/dbos/transact/execution/RecoveryServiceTest.java @@ -9,6 +9,7 @@ import dev.dbos.transact.context.WorkflowOptions; import dev.dbos.transact.utils.DBUtils; import dev.dbos.transact.utils.PgContainer; +import dev.dbos.transact.workflow.ListWorkflowsInput; import dev.dbos.transact.workflow.Queue; import dev.dbos.transact.workflow.WorkflowHandle; import dev.dbos.transact.workflow.WorkflowState; @@ -96,8 +97,11 @@ void recoverWorkflows() throws Exception { setWorkflowStateToPending(dataSource); var pending = - systemDatabase.getPendingWorkflows( - List.of(dbosExecutor.executorId()), dbosExecutor.appVersion()); + systemDatabase.listWorkflows( + new ListWorkflowsInput() + .withStatus(WorkflowState.PENDING) + .withExecutorIds(List.of(dbosExecutor.executorId())) + .withApplicationVersion(dbosExecutor.appVersion())); assertEquals(5, pending.size()); diff --git a/transact/src/test/java/dev/dbos/transact/queue/QueuesTest.java b/transact/src/test/java/dev/dbos/transact/queue/QueuesTest.java index 1f7cabd6..9fdb3590 100644 --- a/transact/src/test/java/dev/dbos/transact/queue/QueuesTest.java +++ b/transact/src/test/java/dev/dbos/transact/queue/QueuesTest.java @@ -371,7 +371,7 @@ public void testLimiter() throws Exception { times.add(result); } - double waveTolerance = 0.5; + double waveTolerance = 1.0; for (int wave = 0; wave < numWaves; wave++) { for (int i = wave * limit; i < (wave + 1) * limit - 1; i++) { double diff = times.get(i + 1) - times.get(i);