Skip to content

Commit 3330cd7

Browse files
authored
Merge branch 'main' into devhawk/dynamic-queues
2 parents 0ed1106 + 9184328 commit 3330cd7

8 files changed

Lines changed: 39 additions & 29 deletions

File tree

transact/src/main/java/dev/dbos/transact/conductor/Conductor.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import dev.dbos.transact.workflow.StepInfo;
1111
import dev.dbos.transact.workflow.WorkflowHandle;
1212
import dev.dbos.transact.workflow.WorkflowSchedule;
13+
import dev.dbos.transact.workflow.WorkflowState;
1314
import dev.dbos.transact.workflow.WorkflowStatus;
1415

1516
import java.io.ByteArrayInputStream;
@@ -568,6 +569,9 @@ void setPingInterval(WebSocket ws) {
568569
}
569570
});
570571

572+
if (pingTimeout != null) {
573+
pingTimeout.cancel(false);
574+
}
571575
pingTimeout =
572576
scheduler.schedule(
573577
() -> {
@@ -938,8 +942,11 @@ static CompletableFuture<BaseResponse> handleExistPendingWorkflows(
938942
ExistPendingWorkflowsRequest request = (ExistPendingWorkflowsRequest) message;
939943
try {
940944
var pending =
941-
conductor.systemDatabase.getPendingWorkflows(
942-
List.of(request.executor_id), request.application_version);
945+
conductor.systemDatabase.listWorkflows(
946+
new ListWorkflowsInput()
947+
.withStatus(WorkflowState.PENDING)
948+
.withExecutorIds(List.of(request.executor_id))
949+
.withApplicationVersion(request.application_version));
943950
return new ExistPendingWorkflowsResponse(request, !pending.isEmpty());
944951
} catch (Exception e) {
945952
logger.error("Exception encountered when checking for pending workflows", e);

transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -374,10 +374,6 @@ public List<WorkflowAggregateRow> getWorkflowAggregates(GetWorkflowAggregatesInp
374374
return dbRetry(() -> WorkflowDAO.getWorkflowAggregates(ctx, input));
375375
}
376376

377-
public List<WorkflowStatus> getPendingWorkflows(List<String> executorIds, String appVersion) {
378-
return dbRetry(() -> WorkflowDAO.getPendingWorkflows(ctx, executorIds, appVersion));
379-
}
380-
381377
public boolean clearQueueAssignment(String workflowId) {
382378
return dbRetry(() -> QueuesDAO.clearQueueAssignment(ctx, workflowId));
383379
}

transact/src/main/java/dev/dbos/transact/database/dao/WorkflowDAO.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -822,16 +822,6 @@ private static WorkflowStatus resultsToWorkflowStatus(
822822
return info;
823823
}
824824

825-
public static List<WorkflowStatus> getPendingWorkflows(
826-
DbContext ctx, List<String> executorIds, String appVersion) throws SQLException {
827-
var input =
828-
new ListWorkflowsInput()
829-
.withStatus(WorkflowState.PENDING)
830-
.withExecutorIds(executorIds)
831-
.withApplicationVersion(appVersion);
832-
return listWorkflows(ctx, input);
833-
}
834-
835825
@SuppressWarnings("unchecked")
836826
public static <T> Result<T> awaitWorkflowResult(
837827
DbContext ctx, Duration dbPollingInterval, String workflowId) throws SQLException {

transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,10 +237,19 @@ public void start(
237237
listener.dbosLaunched(dbos);
238238
}
239239

240+
var recoveryQuery =
241+
new ListWorkflowsInput()
242+
.withStatus(WorkflowState.PENDING)
243+
.withExecutorIds(List.of(executorId()))
244+
.withApplicationVersion(appVersion)
245+
.withEndTime(Instant.now());
240246
Runnable recoveryTask =
241247
() -> {
242248
try {
243-
recoverPendingWorkflows(List.of(executorId()));
249+
var workflows = systemDatabase.listWorkflows(recoveryQuery);
250+
for (var wf : workflows) {
251+
recoverWorkflow(wf.workflowId(), wf.queueName());
252+
}
244253
} catch (Throwable t) {
245254
logger.error("Recovery task failed", t);
246255
}
@@ -994,7 +1003,12 @@ public boolean deprecatePatch(String patchName) {
9941003
public List<WorkflowHandle<?, ?>> recoverPendingWorkflows(List<String> executorIds) {
9951004
Objects.requireNonNull(executorIds);
9961005

997-
var workflows = systemDatabase.getPendingWorkflows(executorIds, appVersion);
1006+
var input =
1007+
new ListWorkflowsInput()
1008+
.withStatus(WorkflowState.PENDING)
1009+
.withExecutorIds(executorIds)
1010+
.withApplicationVersion(appVersion);
1011+
var workflows = systemDatabase.listWorkflows(input);
9981012
return workflows.stream()
9991013
.map(wf -> recoverWorkflow(wf.workflowId(), wf.queueName()))
10001014
.collect(Collectors.toList());

transact/src/test/java/dev/dbos/transact/conductor/ConductorTest.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ public void onClose(WebSocket conn, int code, String reason, boolean remote) {
158158
Listener listener = new Listener();
159159
testServer.setListener(listener);
160160

161-
builder.pingPeriodMs(2000).pingTimeoutMs(1000);
161+
builder.pingPeriodMs(2000).pingTimeoutMs(5000);
162162
try (Conductor conductor = builder.build()) {
163163
conductor.start();
164164

@@ -1538,13 +1538,12 @@ public void canExistPendingWorkflows() throws Exception {
15381538
testServer.setListener(listener);
15391539
String executorId = "exec-id";
15401540
String appVersion = "app-version";
1541-
var executorIds = List.of(executorId);
15421541

15431542
List<WorkflowStatus> outputs = new ArrayList<>();
15441543
outputs.add(new WorkflowStatusBuilder("wf-1").build());
15451544
outputs.add(new WorkflowStatusBuilder("wf-2").queueName("queue").build());
15461545

1547-
when(mockDB.getPendingWorkflows(executorIds, appVersion)).thenReturn(outputs);
1546+
when(mockDB.listWorkflows(any(ListWorkflowsInput.class))).thenReturn(outputs);
15481547

15491548
try (Conductor conductor = builder.build()) {
15501549
conductor.start();
@@ -1556,7 +1555,7 @@ public void canExistPendingWorkflows() throws Exception {
15561555
listener.send(MessageType.EXIST_PENDING_WORKFLOWS, "12345", message);
15571556

15581557
assertTrue(listener.messageLatch.await(1, TimeUnit.SECONDS), "message latch timed out");
1559-
verify(mockDB).getPendingWorkflows(executorIds, appVersion);
1558+
verify(mockDB).listWorkflows(any(ListWorkflowsInput.class));
15601559

15611560
JsonNode jsonNode = mapper.readTree(listener.message);
15621561
assertNotNull(jsonNode);
@@ -1572,9 +1571,8 @@ public void canExistPendingWorkflowsFalse() throws Exception {
15721571
testServer.setListener(listener);
15731572
String executorId = "exec-id";
15741573
String appVersion = "app-version";
1575-
var executorIds = List.of(executorId);
15761574

1577-
when(mockDB.getPendingWorkflows(executorIds, appVersion)).thenReturn(List.of());
1575+
when(mockDB.listWorkflows(any(ListWorkflowsInput.class))).thenReturn(List.of());
15781576

15791577
try (Conductor conductor = builder.build()) {
15801578
conductor.start();
@@ -1592,7 +1590,7 @@ public void canExistPendingWorkflowsFalse() throws Exception {
15921590
listener.send(MessageType.EXIST_PENDING_WORKFLOWS, "12345", message);
15931591

15941592
assertTrue(listener.messageLatch.await(1, TimeUnit.SECONDS), "message latch timed out");
1595-
verify(mockDB).getPendingWorkflows(executorIds, appVersion);
1593+
verify(mockDB).listWorkflows(any(ListWorkflowsInput.class));
15961594

15971595
JsonNode jsonNode = mapper.readTree(listener.message);
15981596
assertNotNull(jsonNode);

transact/src/test/java/dev/dbos/transact/execution/DBOSExecutorTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import dev.dbos.transact.workflow.*;
1515

1616
import java.util.List;
17+
import java.util.UUID;
1718

1819
import com.zaxxer.hikari.HikariDataSource;
1920
import org.junit.jupiter.api.Assumptions;
@@ -302,7 +303,7 @@ public void sleepRecovery() throws Exception {
302303
dbos.launch();
303304
var dbosExecutor = DBOSTestAccess.getDbosExecutor(dbos);
304305

305-
String wfid = "wf-123";
306+
String wfid = UUID.randomUUID().toString();
306307
try (var id = new WorkflowOptions(wfid).setContext()) {
307308
executingService.sleepingWorkflow(.002f);
308309
}

transact/src/test/java/dev/dbos/transact/execution/RecoveryServiceTest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import dev.dbos.transact.context.WorkflowOptions;
1010
import dev.dbos.transact.utils.DBUtils;
1111
import dev.dbos.transact.utils.PgContainer;
12+
import dev.dbos.transact.workflow.ListWorkflowsInput;
1213
import dev.dbos.transact.workflow.Queue;
1314
import dev.dbos.transact.workflow.WorkflowHandle;
1415
import dev.dbos.transact.workflow.WorkflowState;
@@ -96,8 +97,11 @@ void recoverWorkflows() throws Exception {
9697
setWorkflowStateToPending(dataSource);
9798

9899
var pending =
99-
systemDatabase.getPendingWorkflows(
100-
List.of(dbosExecutor.executorId()), dbosExecutor.appVersion());
100+
systemDatabase.listWorkflows(
101+
new ListWorkflowsInput()
102+
.withStatus(WorkflowState.PENDING)
103+
.withExecutorIds(List.of(dbosExecutor.executorId()))
104+
.withApplicationVersion(dbosExecutor.appVersion()));
101105

102106
assertEquals(5, pending.size());
103107

transact/src/test/java/dev/dbos/transact/queue/QueuesTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ public void testLimiter() throws Exception {
372372
times.add(result);
373373
}
374374

375-
double waveTolerance = 0.5;
375+
double waveTolerance = 1.0;
376376
for (int wave = 0; wave < numWaves; wave++) {
377377
for (int i = wave * limit; i < (wave + 1) * limit - 1; i++) {
378378
double diff = times.get(i + 1) - times.get(i);

0 commit comments

Comments
 (0)