Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import dev.dbos.transact.workflow.StepInfo;
import dev.dbos.transact.workflow.WorkflowHandle;
import dev.dbos.transact.workflow.WorkflowSchedule;
import dev.dbos.transact.workflow.WorkflowState;
import dev.dbos.transact.workflow.WorkflowStatus;

import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -567,6 +568,9 @@ void setPingInterval(WebSocket ws) {
}
});

if (pingTimeout != null) {
pingTimeout.cancel(false);
}
pingTimeout =
scheduler.schedule(
() -> {
Expand Down Expand Up @@ -935,8 +939,11 @@ static CompletableFuture<BaseResponse> handleExistPendingWorkflows(
ExistPendingWorkflowsRequest request = (ExistPendingWorkflowsRequest) message;
try {
var pending =
conductor.systemDatabase.getPendingWorkflows(
List.of(request.executor_id), request.application_version);
conductor.systemDatabase.listWorkflows(
new ListWorkflowsInput()
.withStatus(WorkflowState.PENDING)
.withExecutorIds(List.of(request.executor_id))
.withApplicationVersion(request.application_version));
return new ExistPendingWorkflowsResponse(request, !pending.isEmpty());
} catch (Exception e) {
logger.error("Exception encountered when checking for pending workflows", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,10 +373,6 @@ public List<WorkflowAggregateRow> getWorkflowAggregates(GetWorkflowAggregatesInp
return dbRetry(() -> WorkflowDAO.getWorkflowAggregates(ctx, input));
}

public List<WorkflowStatus> getPendingWorkflows(List<String> executorIds, String appVersion) {
return dbRetry(() -> WorkflowDAO.getPendingWorkflows(ctx, executorIds, appVersion));
}

public boolean clearQueueAssignment(String workflowId) {
return dbRetry(() -> QueuesDAO.clearQueueAssignment(ctx, workflowId));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -822,16 +822,6 @@ private static WorkflowStatus resultsToWorkflowStatus(
return info;
}

public static List<WorkflowStatus> getPendingWorkflows(
DbContext ctx, List<String> executorIds, String appVersion) throws SQLException {
var input =
new ListWorkflowsInput()
.withStatus(WorkflowState.PENDING)
.withExecutorIds(executorIds)
.withApplicationVersion(appVersion);
return listWorkflows(ctx, input);
}

@SuppressWarnings("unchecked")
public static <T> Result<T> awaitWorkflowResult(
DbContext ctx, Duration dbPollingInterval, String workflowId) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,19 @@ public void start(
listener.dbosLaunched(dbos);
}

var recoveryQuery =
new ListWorkflowsInput()
.withStatus(WorkflowState.PENDING)
.withExecutorIds(List.of(executorId()))
.withApplicationVersion(appVersion)
.withEndTime(Instant.now());
Runnable recoveryTask =
() -> {
try {
recoverPendingWorkflows(List.of(executorId()));
var workflows = systemDatabase.listWorkflows(recoveryQuery);
for (var wf : workflows) {
recoverWorkflow(wf.workflowId(), wf.queueName());
}
} catch (Throwable t) {
logger.error("Recovery task failed", t);
}
Expand Down Expand Up @@ -960,7 +969,12 @@ public boolean deprecatePatch(String patchName) {
public List<WorkflowHandle<?, ?>> recoverPendingWorkflows(List<String> executorIds) {
Objects.requireNonNull(executorIds);

var workflows = systemDatabase.getPendingWorkflows(executorIds, appVersion);
var input =
new ListWorkflowsInput()
.withStatus(WorkflowState.PENDING)
.withExecutorIds(executorIds)
.withApplicationVersion(appVersion);
var workflows = systemDatabase.listWorkflows(input);
return workflows.stream()
.map(wf -> recoverWorkflow(wf.workflowId(), wf.queueName()))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public void onClose(WebSocket conn, int code, String reason, boolean remote) {
Listener listener = new Listener();
testServer.setListener(listener);

builder.pingPeriodMs(2000).pingTimeoutMs(1000);
builder.pingPeriodMs(2000).pingTimeoutMs(5000);
try (Conductor conductor = builder.build()) {
conductor.start();

Expand Down Expand Up @@ -1538,13 +1538,12 @@ public void canExistPendingWorkflows() throws Exception {
testServer.setListener(listener);
String executorId = "exec-id";
String appVersion = "app-version";
var executorIds = List.of(executorId);

List<WorkflowStatus> outputs = new ArrayList<>();
outputs.add(new WorkflowStatusBuilder("wf-1").build());
outputs.add(new WorkflowStatusBuilder("wf-2").queueName("queue").build());

when(mockDB.getPendingWorkflows(executorIds, appVersion)).thenReturn(outputs);
when(mockDB.listWorkflows(any(ListWorkflowsInput.class))).thenReturn(outputs);

try (Conductor conductor = builder.build()) {
conductor.start();
Expand All @@ -1556,7 +1555,7 @@ public void canExistPendingWorkflows() throws Exception {
listener.send(MessageType.EXIST_PENDING_WORKFLOWS, "12345", message);

assertTrue(listener.messageLatch.await(1, TimeUnit.SECONDS), "message latch timed out");
verify(mockDB).getPendingWorkflows(executorIds, appVersion);
verify(mockDB).listWorkflows(any(ListWorkflowsInput.class));

JsonNode jsonNode = mapper.readTree(listener.message);
assertNotNull(jsonNode);
Expand All @@ -1572,9 +1571,8 @@ public void canExistPendingWorkflowsFalse() throws Exception {
testServer.setListener(listener);
String executorId = "exec-id";
String appVersion = "app-version";
var executorIds = List.of(executorId);

when(mockDB.getPendingWorkflows(executorIds, appVersion)).thenReturn(List.of());
when(mockDB.listWorkflows(any(ListWorkflowsInput.class))).thenReturn(List.of());

try (Conductor conductor = builder.build()) {
conductor.start();
Expand All @@ -1592,7 +1590,7 @@ public void canExistPendingWorkflowsFalse() throws Exception {
listener.send(MessageType.EXIST_PENDING_WORKFLOWS, "12345", message);

assertTrue(listener.messageLatch.await(1, TimeUnit.SECONDS), "message latch timed out");
verify(mockDB).getPendingWorkflows(executorIds, appVersion);
verify(mockDB).listWorkflows(any(ListWorkflowsInput.class));

JsonNode jsonNode = mapper.readTree(listener.message);
assertNotNull(jsonNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import dev.dbos.transact.workflow.*;

import java.util.List;
import java.util.UUID;

import com.zaxxer.hikari.HikariDataSource;
import org.junit.jupiter.api.Assumptions;
Expand Down Expand Up @@ -302,7 +303,7 @@ public void sleepRecovery() throws Exception {
dbos.launch();
var dbosExecutor = DBOSTestAccess.getDbosExecutor(dbos);

String wfid = "wf-123";
String wfid = UUID.randomUUID().toString();
try (var id = new WorkflowOptions(wfid).setContext()) {
executingService.sleepingWorkflow(.002f);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import dev.dbos.transact.context.WorkflowOptions;
import dev.dbos.transact.utils.DBUtils;
import dev.dbos.transact.utils.PgContainer;
import dev.dbos.transact.workflow.ListWorkflowsInput;
import dev.dbos.transact.workflow.Queue;
import dev.dbos.transact.workflow.WorkflowHandle;
import dev.dbos.transact.workflow.WorkflowState;
Expand Down Expand Up @@ -96,8 +97,11 @@ void recoverWorkflows() throws Exception {
setWorkflowStateToPending(dataSource);

var pending =
systemDatabase.getPendingWorkflows(
List.of(dbosExecutor.executorId()), dbosExecutor.appVersion());
systemDatabase.listWorkflows(
new ListWorkflowsInput()
.withStatus(WorkflowState.PENDING)
.withExecutorIds(List.of(dbosExecutor.executorId()))
.withApplicationVersion(dbosExecutor.appVersion()));

assertEquals(5, pending.size());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ public void testLimiter() throws Exception {
times.add(result);
}

double waveTolerance = 0.5;
double waveTolerance = 1.0;
for (int wave = 0; wave < numWaves; wave++) {
for (int i = wave * limit; i < (wave + 1) * limit - 1; i++) {
double diff = times.get(i + 1) - times.get(i);
Expand Down