Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,36 @@ SELECT COUNT(*)
maxTasks = Math.min(maxTasks, availableTasks);
}

// Version-less workflows (application_version IS NULL) are only dequeued
// when this worker is running the latest registered application version.
boolean isLatestVersion = true;
String latestVersionQuery =
"""
SELECT version_name FROM "%s".application_versions
ORDER BY version_timestamp DESC LIMIT 1
"""
.formatted(ctx.schema());
try (var ps = connection.prepareStatement(latestVersionQuery);
ResultSet rs = ps.executeQuery()) {
if (rs.next()) {
isLatestVersion = rs.getString(1).equals(appVersion);
}
}

String versionClause =
isLatestVersion
? "(application_version = ? OR application_version IS NULL)"
: "application_version = ?";

var query =
"""
SELECT workflow_uuid
FROM "%s".workflow_status
WHERE queue_name = ?
AND status = ?
AND (application_version = ? OR application_version IS NULL)
AND %s
"""
.formatted(ctx.schema());
.formatted(ctx.schema(), versionClause);
if (partitionKey != null) {
query += " AND queue_partition_key = ?";
}
Expand Down
50 changes: 50 additions & 0 deletions transact/src/test/java/dev/dbos/transact/client/ClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import dev.dbos.transact.DBOS;
import dev.dbos.transact.DBOSClient;
import dev.dbos.transact.DBOSTestAccess;
import dev.dbos.transact.StartWorkflowOptions;
import dev.dbos.transact.config.DBOSConfig;
import dev.dbos.transact.exceptions.DBOSAwaitedWorkflowCancelledException;
import dev.dbos.transact.exceptions.DBOSNonExistentWorkflowException;
Expand Down Expand Up @@ -508,6 +509,55 @@ public void authFlowsFromEnqueuedParentToChild() throws Exception {
}
}

@Test
public void versionlessWorkflowDequeuedOnlyWhenWorkerIsLatest() throws Exception {
// The worker runs app version "v1.0.0" (set in beforeEach), registered at launch.
var sysdb = DBOSTestAccess.getSystemDatabase(dbos);
String workerVersion = DBOSTestAccess.getDbosExecutor(dbos).appVersion();
assertEquals("v1.0.0", workerVersion);

// Register a newer version so this worker is NOT running the latest registered version.
sysdb.createApplicationVersion("v2.0.0");
sysdb.updateApplicationVersionTimestamp("v2.0.0", Instant.now().plus(Duration.ofHours(1)));
assertEquals("v2.0.0", sysdb.getLatestApplicationVersion().versionName());

// Enqueue a version-less workflow via the client (application_version stays NULL).
String versionlessId = "versionless-" + UUID.randomUUID();
try (var client = pgContainer.dbosClient()) {
var options =
new DBOSClient.EnqueueOptions("enqueueTest", "ClientServiceImpl", "testQueue")
.withWorkflowId(versionlessId);
client.enqueueWorkflow(options, new Object[] {1, "versionless"});
}
var versionlessHandle = dbos.retrieveWorkflow(versionlessId);
var versionlessRow = DBUtils.getWorkflowRow(dataSource, versionlessId);
assertEquals(WorkflowState.ENQUEUED.name(), versionlessRow.status());
assertNull(versionlessRow.applicationVersion());

// Enqueue a workflow tagged with the worker's current version via the in-process API.
String taggedId = "tagged-" + UUID.randomUUID();
var taggedHandle =
dbos.startWorkflow(
() -> service.enqueueTest(2, "tagged"),
new StartWorkflowOptions(taggedId).withQueue("testQueue"));

// The version-matched workflow is dequeued and completes even though the worker is not latest.
assertEquals("2-tagged", taggedHandle.getResult());
assertEquals(workerVersion, DBUtils.getWorkflowRow(dataSource, taggedId).applicationVersion());

// The version-less workflow must NOT be dequeued while the worker is not the latest version.
Thread.sleep(2000); // allow several queue poll cycles to run
assertEquals(WorkflowState.ENQUEUED, versionlessHandle.getStatus().status());
assertNull(DBUtils.getWorkflowRow(dataSource, versionlessId).applicationVersion());

// Promote the worker's version to latest; the version-less workflow should now run.
sysdb.updateApplicationVersionTimestamp(workerVersion, Instant.now().plus(Duration.ofHours(2)));
assertEquals(workerVersion, sysdb.getLatestApplicationVersion().versionName());

assertEquals("1-versionless", versionlessHandle.getResult());
assertEquals(WorkflowState.SUCCESS, versionlessHandle.getStatus().status());
}

@Test
public void enqueueOptionsNoTimeoutOrDeadlineWrittenToDb() throws Exception {
var qs = DBOSTestAccess.getQueueService(dbos);
Expand Down