diff --git a/transact/src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java b/transact/src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java index d12e9f3c..91e9aecc 100644 --- a/transact/src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java +++ b/transact/src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java @@ -134,15 +134,36 @@ SELECT COUNT(*) maxTasks = Math.min(maxTasks, availableTasks); } + // Version-less workflows (application_version IS NULL) are only dequeued + // when this worker is running the latest registered application version. + boolean isLatestVersion = true; + String latestVersionQuery = + """ + SELECT version_name FROM "%s".application_versions + ORDER BY version_timestamp DESC LIMIT 1 + """ + .formatted(ctx.schema()); + try (var ps = connection.prepareStatement(latestVersionQuery); + ResultSet rs = ps.executeQuery()) { + if (rs.next()) { + isLatestVersion = rs.getString(1).equals(appVersion); + } + } + + String versionClause = + isLatestVersion + ? "(application_version = ? OR application_version IS NULL)" + : "application_version = ?"; + var query = """ SELECT workflow_uuid FROM "%s".workflow_status WHERE queue_name = ? AND status = ? - AND (application_version = ? OR application_version IS NULL) + AND %s """ - .formatted(ctx.schema()); + .formatted(ctx.schema(), versionClause); if (partitionKey != null) { query += " AND queue_partition_key = ?"; } diff --git a/transact/src/test/java/dev/dbos/transact/client/ClientTest.java b/transact/src/test/java/dev/dbos/transact/client/ClientTest.java index 744377d4..56ddf9dc 100644 --- a/transact/src/test/java/dev/dbos/transact/client/ClientTest.java +++ b/transact/src/test/java/dev/dbos/transact/client/ClientTest.java @@ -9,6 +9,7 @@ import dev.dbos.transact.DBOS; import dev.dbos.transact.DBOSClient; import dev.dbos.transact.DBOSTestAccess; +import dev.dbos.transact.StartWorkflowOptions; import dev.dbos.transact.config.DBOSConfig; import dev.dbos.transact.exceptions.DBOSAwaitedWorkflowCancelledException; import dev.dbos.transact.exceptions.DBOSNonExistentWorkflowException; @@ -508,6 +509,55 @@ public void authFlowsFromEnqueuedParentToChild() throws Exception { } } + @Test + public void versionlessWorkflowDequeuedOnlyWhenWorkerIsLatest() throws Exception { + // The worker runs app version "v1.0.0" (set in beforeEach), registered at launch. + var sysdb = DBOSTestAccess.getSystemDatabase(dbos); + String workerVersion = DBOSTestAccess.getDbosExecutor(dbos).appVersion(); + assertEquals("v1.0.0", workerVersion); + + // Register a newer version so this worker is NOT running the latest registered version. + sysdb.createApplicationVersion("v2.0.0"); + sysdb.updateApplicationVersionTimestamp("v2.0.0", Instant.now().plus(Duration.ofHours(1))); + assertEquals("v2.0.0", sysdb.getLatestApplicationVersion().versionName()); + + // Enqueue a version-less workflow via the client (application_version stays NULL). + String versionlessId = "versionless-" + UUID.randomUUID(); + try (var client = pgContainer.dbosClient()) { + var options = + new DBOSClient.EnqueueOptions("enqueueTest", "ClientServiceImpl", "testQueue") + .withWorkflowId(versionlessId); + client.enqueueWorkflow(options, new Object[] {1, "versionless"}); + } + var versionlessHandle = dbos.retrieveWorkflow(versionlessId); + var versionlessRow = DBUtils.getWorkflowRow(dataSource, versionlessId); + assertEquals(WorkflowState.ENQUEUED.name(), versionlessRow.status()); + assertNull(versionlessRow.applicationVersion()); + + // Enqueue a workflow tagged with the worker's current version via the in-process API. + String taggedId = "tagged-" + UUID.randomUUID(); + var taggedHandle = + dbos.startWorkflow( + () -> service.enqueueTest(2, "tagged"), + new StartWorkflowOptions(taggedId).withQueue("testQueue")); + + // The version-matched workflow is dequeued and completes even though the worker is not latest. + assertEquals("2-tagged", taggedHandle.getResult()); + assertEquals(workerVersion, DBUtils.getWorkflowRow(dataSource, taggedId).applicationVersion()); + + // The version-less workflow must NOT be dequeued while the worker is not the latest version. + Thread.sleep(2000); // allow several queue poll cycles to run + assertEquals(WorkflowState.ENQUEUED, versionlessHandle.getStatus().status()); + assertNull(DBUtils.getWorkflowRow(dataSource, versionlessId).applicationVersion()); + + // Promote the worker's version to latest; the version-less workflow should now run. + sysdb.updateApplicationVersionTimestamp(workerVersion, Instant.now().plus(Duration.ofHours(2))); + assertEquals(workerVersion, sysdb.getLatestApplicationVersion().versionName()); + + assertEquals("1-versionless", versionlessHandle.getResult()); + assertEquals(WorkflowState.SUCCESS, versionlessHandle.getStatus().status()); + } + @Test public void enqueueOptionsNoTimeoutOrDeadlineWrittenToDb() throws Exception { var qs = DBOSTestAccess.getQueueService(dbos);