From b98bba6ca78a2dee3b7bd983a0306b80a6d6f229 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 1 Jul 2026 10:48:00 -0700 Subject: [PATCH 1/5] No-version workflows are only dequeued by latest-version executors --- .../dbos/transact/database/dao/QueuesDAO.java | 11 ++-- .../dev/dbos/transact/client/ClientTest.java | 50 +++++++++++++++++++ 2 files changed, 58 insertions(+), 3 deletions(-) diff --git a/transact/src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java b/transact/src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java index d12e9f3c..dcf6848a 100644 --- a/transact/src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java +++ b/transact/src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java @@ -134,15 +134,19 @@ SELECT COUNT(*) maxTasks = Math.min(maxTasks, availableTasks); } + // Version-less workflows (application_version IS NULL) are only dequeued + // when this worker is running the latest registered application version. var query = """ SELECT workflow_uuid FROM "%s".workflow_status WHERE queue_name = ? AND status = ? - AND (application_version = ? OR application_version IS NULL) + AND (application_version = ? OR (application_version IS NULL AND ? = ( + SELECT version_name FROM "%s".application_versions + ORDER BY version_timestamp DESC LIMIT 1))) """ - .formatted(ctx.schema()); + .formatted(ctx.schema(), ctx.schema()); if (partitionKey != null) { query += " AND queue_partition_key = ?"; } @@ -164,8 +168,9 @@ SELECT COUNT(*) ps.setString(1, queue.name()); ps.setString(2, WorkflowState.ENQUEUED.name()); ps.setString(3, appVersion); + ps.setString(4, appVersion); if (partitionKey != null) { - ps.setString(4, partitionKey); + ps.setString(5, partitionKey); } try (ResultSet rs = ps.executeQuery()) { diff --git a/transact/src/test/java/dev/dbos/transact/client/ClientTest.java b/transact/src/test/java/dev/dbos/transact/client/ClientTest.java index 744377d4..56ddf9dc 100644 --- a/transact/src/test/java/dev/dbos/transact/client/ClientTest.java +++ b/transact/src/test/java/dev/dbos/transact/client/ClientTest.java @@ -9,6 +9,7 @@ import dev.dbos.transact.DBOS; import dev.dbos.transact.DBOSClient; import dev.dbos.transact.DBOSTestAccess; +import dev.dbos.transact.StartWorkflowOptions; import dev.dbos.transact.config.DBOSConfig; import dev.dbos.transact.exceptions.DBOSAwaitedWorkflowCancelledException; import dev.dbos.transact.exceptions.DBOSNonExistentWorkflowException; @@ -508,6 +509,55 @@ public void authFlowsFromEnqueuedParentToChild() throws Exception { } } + @Test + public void versionlessWorkflowDequeuedOnlyWhenWorkerIsLatest() throws Exception { + // The worker runs app version "v1.0.0" (set in beforeEach), registered at launch. + var sysdb = DBOSTestAccess.getSystemDatabase(dbos); + String workerVersion = DBOSTestAccess.getDbosExecutor(dbos).appVersion(); + assertEquals("v1.0.0", workerVersion); + + // Register a newer version so this worker is NOT running the latest registered version. + sysdb.createApplicationVersion("v2.0.0"); + sysdb.updateApplicationVersionTimestamp("v2.0.0", Instant.now().plus(Duration.ofHours(1))); + assertEquals("v2.0.0", sysdb.getLatestApplicationVersion().versionName()); + + // Enqueue a version-less workflow via the client (application_version stays NULL). + String versionlessId = "versionless-" + UUID.randomUUID(); + try (var client = pgContainer.dbosClient()) { + var options = + new DBOSClient.EnqueueOptions("enqueueTest", "ClientServiceImpl", "testQueue") + .withWorkflowId(versionlessId); + client.enqueueWorkflow(options, new Object[] {1, "versionless"}); + } + var versionlessHandle = dbos.retrieveWorkflow(versionlessId); + var versionlessRow = DBUtils.getWorkflowRow(dataSource, versionlessId); + assertEquals(WorkflowState.ENQUEUED.name(), versionlessRow.status()); + assertNull(versionlessRow.applicationVersion()); + + // Enqueue a workflow tagged with the worker's current version via the in-process API. + String taggedId = "tagged-" + UUID.randomUUID(); + var taggedHandle = + dbos.startWorkflow( + () -> service.enqueueTest(2, "tagged"), + new StartWorkflowOptions(taggedId).withQueue("testQueue")); + + // The version-matched workflow is dequeued and completes even though the worker is not latest. + assertEquals("2-tagged", taggedHandle.getResult()); + assertEquals(workerVersion, DBUtils.getWorkflowRow(dataSource, taggedId).applicationVersion()); + + // The version-less workflow must NOT be dequeued while the worker is not the latest version. + Thread.sleep(2000); // allow several queue poll cycles to run + assertEquals(WorkflowState.ENQUEUED, versionlessHandle.getStatus().status()); + assertNull(DBUtils.getWorkflowRow(dataSource, versionlessId).applicationVersion()); + + // Promote the worker's version to latest; the version-less workflow should now run. + sysdb.updateApplicationVersionTimestamp(workerVersion, Instant.now().plus(Duration.ofHours(2))); + assertEquals(workerVersion, sysdb.getLatestApplicationVersion().versionName()); + + assertEquals("1-versionless", versionlessHandle.getResult()); + assertEquals(WorkflowState.SUCCESS, versionlessHandle.getStatus().status()); + } + @Test public void enqueueOptionsNoTimeoutOrDeadlineWrittenToDb() throws Exception { var qs = DBOSTestAccess.getQueueService(dbos); From d18f41b20c83b640b71f7c0082a154214029e496 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 1 Jul 2026 11:49:16 -0700 Subject: [PATCH 2/5] nit --- .../main/java/dev/dbos/transact/database/dao/QueuesDAO.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/transact/src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java b/transact/src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java index dcf6848a..5357c4e7 100644 --- a/transact/src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java +++ b/transact/src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java @@ -139,14 +139,14 @@ SELECT COUNT(*) var query = """ SELECT workflow_uuid - FROM "%s".workflow_status + FROM "%1$s".workflow_status WHERE queue_name = ? AND status = ? AND (application_version = ? OR (application_version IS NULL AND ? = ( - SELECT version_name FROM "%s".application_versions + SELECT version_name FROM "%1$s".application_versions ORDER BY version_timestamp DESC LIMIT 1))) """ - .formatted(ctx.schema(), ctx.schema()); + .formatted(ctx.schema()); if (partitionKey != null) { query += " AND queue_partition_key = ?"; } From 5cc037f26b86c4b5c9098d6568de2ef30c1061e4 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 1 Jul 2026 15:39:28 -0700 Subject: [PATCH 3/5] split queries --- .../dbos/transact/database/dao/QueuesDAO.java | 25 ++++++++++++++++--- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/transact/src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java b/transact/src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java index 5357c4e7..d248107c 100644 --- a/transact/src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java +++ b/transact/src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java @@ -136,17 +136,34 @@ SELECT COUNT(*) // Version-less workflows (application_version IS NULL) are only dequeued // when this worker is running the latest registered application version. + boolean isLatestVersion = true; + String latestVersionQuery = + """ + SELECT version_name FROM "%1$s".application_versions + ORDER BY version_timestamp DESC LIMIT 1 + """ + .formatted(ctx.schema()); + try (var ps = connection.prepareStatement(latestVersionQuery); + ResultSet rs = ps.executeQuery()) { + if (rs.next()) { + isLatestVersion = rs.getString(1).equals(appVersion); + } + } + + String versionClause = + isLatestVersion + ? "(application_version = ? OR application_version IS NULL)" + : "application_version = ?"; + var query = """ SELECT workflow_uuid FROM "%1$s".workflow_status WHERE queue_name = ? AND status = ? - AND (application_version = ? OR (application_version IS NULL AND ? = ( - SELECT version_name FROM "%1$s".application_versions - ORDER BY version_timestamp DESC LIMIT 1))) + AND %2$s """ - .formatted(ctx.schema()); + .formatted(ctx.schema(), versionClause); if (partitionKey != null) { query += " AND queue_partition_key = ?"; } From d47e6452de36269a4f0b0bcccbff05d523060a46 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 1 Jul 2026 15:54:42 -0700 Subject: [PATCH 4/5] nit --- .../java/dev/dbos/transact/database/dao/QueuesDAO.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/transact/src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java b/transact/src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java index d248107c..3ff6cd26 100644 --- a/transact/src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java +++ b/transact/src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java @@ -158,10 +158,10 @@ SELECT COUNT(*) var query = """ SELECT workflow_uuid - FROM "%1$s".workflow_status + FROM "%s".workflow_status WHERE queue_name = ? AND status = ? - AND %2$s + AND %s """ .formatted(ctx.schema(), versionClause); if (partitionKey != null) { @@ -185,9 +185,8 @@ SELECT COUNT(*) ps.setString(1, queue.name()); ps.setString(2, WorkflowState.ENQUEUED.name()); ps.setString(3, appVersion); - ps.setString(4, appVersion); if (partitionKey != null) { - ps.setString(5, partitionKey); + ps.setString(4, partitionKey); } try (ResultSet rs = ps.executeQuery()) { From ba153918e14b88abc1d2c5ba436eec2ffa462812 Mon Sep 17 00:00:00 2001 From: maxdml Date: Wed, 1 Jul 2026 15:55:23 -0700 Subject: [PATCH 5/5] nit --- .../src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transact/src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java b/transact/src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java index 3ff6cd26..91e9aecc 100644 --- a/transact/src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java +++ b/transact/src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java @@ -139,7 +139,7 @@ SELECT COUNT(*) boolean isLatestVersion = true; String latestVersionQuery = """ - SELECT version_name FROM "%1$s".application_versions + SELECT version_name FROM "%s".application_versions ORDER BY version_timestamp DESC LIMIT 1 """ .formatted(ctx.schema());