diff --git a/transact/src/main/java/dev/dbos/transact/DBOS.java b/transact/src/main/java/dev/dbos/transact/DBOS.java
index 6645266e..e00c5dcd 100644
--- a/transact/src/main/java/dev/dbos/transact/DBOS.java
+++ b/transact/src/main/java/dev/dbos/transact/DBOS.java
@@ -14,6 +14,8 @@
import dev.dbos.transact.workflow.ForkOptions;
import dev.dbos.transact.workflow.ListWorkflowsInput;
import dev.dbos.transact.workflow.Queue;
+import dev.dbos.transact.workflow.QueueConflictResolution;
+import dev.dbos.transact.workflow.QueueOptions;
import dev.dbos.transact.workflow.ScheduleStatus;
import dev.dbos.transact.workflow.SerializationStrategy;
import dev.dbos.transact.workflow.Step;
@@ -164,6 +166,76 @@ public void registerQueues(@NonNull Queue... queues) {
}
}
+ /**
+ * Register a database-backed dynamic queue. Must be called after launch. Queue configuration can
+ * be updated at runtime via {@link #updateQueue(String, QueueOptions)}.
+ *
+ *
Uses {@link QueueConflictResolution#UPDATE_IF_LATEST_VERSION} by default: the existing
+ * configuration is overwritten only if this executor is running the latest application version.
+ *
+ * @param name Queue name
+ * @param options Initial configuration options
+ */
+ public void registerQueue(@NonNull String name, @NonNull QueueOptions options) {
+ ensureLaunched("registerQueue")
+ .registerDynamicQueue(name, options, QueueConflictResolution.UPDATE_IF_LATEST_VERSION);
+ }
+
+ /**
+ * Register a database-backed dynamic queue. Must be called after launch. Queue configuration can
+ * be updated at runtime via {@link #updateQueue(String, QueueOptions)}.
+ *
+ * @param name Queue name
+ * @param options Initial configuration options
+ * @param onConflict How to handle an existing queue with the same name
+ */
+ public void registerQueue(
+ @NonNull String name,
+ @NonNull QueueOptions options,
+ @NonNull QueueConflictResolution onConflict) {
+ ensureLaunched("registerQueue").registerDynamicQueue(name, options, onConflict);
+ }
+
+ /**
+ * Update the configuration of a database-backed dynamic queue. Must be called after launch. Only
+ * fields that are present in {@code options} are written; absent fields are left unchanged.
+ *
+ * @param name Queue name
+ * @param options Fields to update
+ */
+ public void updateQueue(@NonNull String name, @NonNull QueueOptions options) {
+ ensureLaunched("updateQueue").updateDynamicQueue(name, options);
+ }
+
+ /**
+ * Retrieve a database-backed dynamic queue by name. Must be called after launch.
+ *
+ * @param name Queue name
+ * @return the queue if it exists in the database, or empty
+ */
+ public @NonNull Optional findQueue(@NonNull String name) {
+ return ensureLaunched("findQueue").findDynamicQueue(name);
+ }
+
+ /**
+ * Delete a database-backed dynamic queue. Must be called after launch.
+ *
+ * @param name Queue name
+ * @return true if the queue was deleted, false if it did not exist
+ */
+ public boolean deleteQueue(@NonNull String name) {
+ return ensureLaunched("deleteQueue").deleteDynamicQueue(name);
+ }
+
+ /**
+ * List all database-backed dynamic queues. Must be called after launch.
+ *
+ * @return list of all queues currently registered in the database
+ */
+ public @NonNull List listQueues() {
+ return ensureLaunched("listQueues").listDynamicQueues();
+ }
+
/**
* Register all workflows and steps in the provided class instance
*
@@ -311,7 +383,7 @@ private DBOSExecutor ensureLaunched(String caller) {
* @return Optional containing the queue definition for given `queueName`, or empty if not found
*/
public @NonNull Optional getQueue(@NonNull String queueName) {
- return ensureLaunched("getQueue").getQueue(queueName);
+ return ensureLaunched("getQueue").findStaticQueue(queueName);
}
/**
diff --git a/transact/src/main/java/dev/dbos/transact/DBOSClient.java b/transact/src/main/java/dev/dbos/transact/DBOSClient.java
index 01fdfed2..a011b206 100644
--- a/transact/src/main/java/dev/dbos/transact/DBOSClient.java
+++ b/transact/src/main/java/dev/dbos/transact/DBOSClient.java
@@ -13,6 +13,9 @@
import dev.dbos.transact.json.SerializationUtil;
import dev.dbos.transact.workflow.ForkOptions;
import dev.dbos.transact.workflow.ListWorkflowsInput;
+import dev.dbos.transact.workflow.Queue;
+import dev.dbos.transact.workflow.QueueConflictResolution;
+import dev.dbos.transact.workflow.QueueOptions;
import dev.dbos.transact.workflow.ScheduleStatus;
import dev.dbos.transact.workflow.SerializationStrategy;
import dev.dbos.transact.workflow.StepInfo;
@@ -1097,4 +1100,78 @@ public void setWorkflowDelay(@NonNull String workflowId, @NonNull Instant delayU
Objects.requireNonNull(delayUntil, "delayUntil must not be null"));
systemDatabase.setWorkflowDelay(workflowId, wfDelay);
}
+
+ /**
+ * Register a database-backed dynamic queue. Uses {@link QueueConflictResolution#ALWAYS_UPDATE} by
+ * default.
+ *
+ * @param name Queue name
+ * @param options Configuration options
+ */
+ public void registerQueue(@NonNull String name, @NonNull QueueOptions options) {
+ registerQueue(name, options, QueueConflictResolution.ALWAYS_UPDATE);
+ }
+
+ /**
+ * Register a database-backed dynamic queue.
+ *
+ * {@link QueueConflictResolution#UPDATE_IF_LATEST_VERSION} is not supported by {@code
+ * DBOSClient} because clients are not associated with an application version. Use {@link
+ * QueueConflictResolution#ALWAYS_UPDATE} or {@link QueueConflictResolution#NEVER_UPDATE}.
+ *
+ * @param name Queue name
+ * @param options Configuration options
+ * @param onConflict How to handle an existing queue with the same name
+ */
+ public void registerQueue(
+ @NonNull String name,
+ @NonNull QueueOptions options,
+ @NonNull QueueConflictResolution onConflict) {
+ if (onConflict == QueueConflictResolution.UPDATE_IF_LATEST_VERSION) {
+ throw new IllegalArgumentException(
+ "DBOSClient.registerQueue does not support UPDATE_IF_LATEST_VERSION because clients are"
+ + " not associated with an application version. Use ALWAYS_UPDATE or NEVER_UPDATE.");
+ }
+ systemDatabase.upsertQueue(name, options, onConflict == QueueConflictResolution.ALWAYS_UPDATE);
+ }
+
+ /**
+ * Update the configuration of a database-backed dynamic queue. Only fields that are present in
+ * {@code options} are written; absent fields are left unchanged.
+ *
+ * @param name Queue name
+ * @param options Fields to update
+ */
+ public void updateQueue(@NonNull String name, @NonNull QueueOptions options) {
+ systemDatabase.updateQueue(name, options);
+ }
+
+ /**
+ * Retrieve a database-backed dynamic queue by name.
+ *
+ * @param name Queue name
+ * @return the queue if it exists in the database, or empty
+ */
+ public @NonNull Optional findQueue(@NonNull String name) {
+ return systemDatabase.findQueue(name);
+ }
+
+ /**
+ * List all database-backed dynamic queues.
+ *
+ * @return list of all queues currently registered in the database
+ */
+ public @NonNull List listQueues() {
+ return systemDatabase.listQueues();
+ }
+
+ /**
+ * Delete a database-backed dynamic queue.
+ *
+ * @param name Queue name
+ * @return true if the queue was deleted, false if it did not exist
+ */
+ public boolean deleteQueue(@NonNull String name) {
+ return systemDatabase.deleteQueue(name);
+ }
}
diff --git a/transact/src/main/java/dev/dbos/transact/admin/AdminServer.java b/transact/src/main/java/dev/dbos/transact/admin/AdminServer.java
index 1c1ebb29..2819c92a 100644
--- a/transact/src/main/java/dev/dbos/transact/admin/AdminServer.java
+++ b/transact/src/main/java/dev/dbos/transact/admin/AdminServer.java
@@ -140,7 +140,7 @@ private void deactivate(HttpExchange exchange) throws IOException {
}
private void workflowQueuesMetadata(HttpExchange exchange) throws IOException {
- var queues = dbosExecutor.getQueues();
+ var queues = dbosExecutor.getStaticQueues();
sendMappedJson(exchange, 200, queues);
}
diff --git a/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java b/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java
index 40be94cb..e7e2569a 100644
--- a/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java
+++ b/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java
@@ -6,6 +6,7 @@
import dev.dbos.transact.workflow.ExportedWorkflow;
import dev.dbos.transact.workflow.ForkOptions;
import dev.dbos.transact.workflow.ListWorkflowsInput;
+import dev.dbos.transact.workflow.Queue;
import dev.dbos.transact.workflow.StepInfo;
import dev.dbos.transact.workflow.WorkflowHandle;
import dev.dbos.transact.workflow.WorkflowSchedule;
@@ -702,6 +703,7 @@ CompletableFuture getResponseAsync(BaseMessage message, WebSocket
case EXPORT_WORKFLOW -> handleExportWorkflow(this, message, ws);
case FORK_WORKFLOW -> handleFork(this, message);
case GET_METRICS -> handleGetMetrics(this, message);
+ case GET_QUEUE -> handleGetQueue(this, message);
case GET_SCHEDULE -> handleGetSchedule(this, message);
case GET_WORKFLOW_AGGREGATES -> handleGetWorkflowAggregates(this, message);
case GET_WORKFLOW_EVENTS -> handleGetWorkflowEvents(this, message);
@@ -711,6 +713,7 @@ CompletableFuture getResponseAsync(BaseMessage message, WebSocket
case IMPORT_WORKFLOW -> handleImportWorkflow(this, message);
case LIST_APPLICATION_VERSIONS -> handleListApplicationVersions(this, message);
case LIST_QUEUED_WORKFLOWS -> handleListQueuedWorkflows(this, message);
+ case LIST_QUEUES -> handleListQueues(this, message);
case LIST_SCHEDULES -> handleListSchedules(this, message);
case LIST_STEPS -> handleListSteps(this, message);
case LIST_WORKFLOWS -> handleListWorkflows(this, message);
@@ -1371,6 +1374,35 @@ static CompletableFuture handleGetSchedule(
});
}
+ static CompletableFuture handleListQueues(
+ Conductor conductor, BaseMessage message) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ List queues = conductor.systemDatabase.listQueues();
+ List output = queues.stream().map(QueueOutput::from).toList();
+ return new ListQueuesResponse(message, output);
+ } catch (Exception e) {
+ logger.error("Exception encountered when listing queues", e);
+ return new ListQueuesResponse(message, e.getMessage());
+ }
+ });
+ }
+
+ static CompletableFuture handleGetQueue(Conductor conductor, BaseMessage message) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ GetQueueRequest request = (GetQueueRequest) message;
+ try {
+ var queue = conductor.systemDatabase.findQueue(request.name);
+ return new GetQueueResponse(request, queue.map(QueueOutput::from).orElse(null));
+ } catch (Exception e) {
+ logger.error("Exception encountered when getting queue {}", request.name, e);
+ return new GetQueueResponse(request, e.getMessage());
+ }
+ });
+ }
+
static CompletableFuture handlePauseSchedule(
Conductor conductor, BaseMessage message) {
return CompletableFuture.supplyAsync(
diff --git a/transact/src/main/java/dev/dbos/transact/conductor/protocol/BaseMessage.java b/transact/src/main/java/dev/dbos/transact/conductor/protocol/BaseMessage.java
index 41835223..97fb7e6e 100644
--- a/transact/src/main/java/dev/dbos/transact/conductor/protocol/BaseMessage.java
+++ b/transact/src/main/java/dev/dbos/transact/conductor/protocol/BaseMessage.java
@@ -19,6 +19,7 @@
@JsonSubTypes.Type(value = ExportWorkflowRequest.class, name = "export_workflow"),
@JsonSubTypes.Type(value = ForkWorkflowRequest.class, name = "fork_workflow"),
@JsonSubTypes.Type(value = GetMetricsRequest.class, name = "get_metrics"),
+ @JsonSubTypes.Type(value = GetQueueRequest.class, name = "get_queue"),
@JsonSubTypes.Type(value = GetScheduleRequest.class, name = "get_schedule"),
@JsonSubTypes.Type(value = GetWorkflowAggregatesRequest.class, name = "get_workflow_aggregates"),
@JsonSubTypes.Type(value = GetWorkflowEventsRequest.class, name = "get_workflow_events"),
@@ -32,6 +33,7 @@
value = ListApplicationVersionsRequest.class,
name = "list_application_versions"),
@JsonSubTypes.Type(value = ListQueuedWorkflowsRequest.class, name = "list_queued_workflows"),
+ @JsonSubTypes.Type(value = ListQueuesRequest.class, name = "list_queues"),
@JsonSubTypes.Type(value = ListSchedulesRequest.class, name = "list_schedules"),
@JsonSubTypes.Type(value = ListStepsRequest.class, name = "list_steps"),
@JsonSubTypes.Type(value = ListWorkflowsRequest.class, name = "list_workflows"),
diff --git a/transact/src/main/java/dev/dbos/transact/conductor/protocol/GetQueueRequest.java b/transact/src/main/java/dev/dbos/transact/conductor/protocol/GetQueueRequest.java
new file mode 100644
index 00000000..69da526f
--- /dev/null
+++ b/transact/src/main/java/dev/dbos/transact/conductor/protocol/GetQueueRequest.java
@@ -0,0 +1,10 @@
+package dev.dbos.transact.conductor.protocol;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class GetQueueRequest extends BaseMessage {
+ public String name;
+
+ public GetQueueRequest() {}
+}
diff --git a/transact/src/main/java/dev/dbos/transact/conductor/protocol/GetQueueResponse.java b/transact/src/main/java/dev/dbos/transact/conductor/protocol/GetQueueResponse.java
new file mode 100644
index 00000000..a31cca2f
--- /dev/null
+++ b/transact/src/main/java/dev/dbos/transact/conductor/protocol/GetQueueResponse.java
@@ -0,0 +1,17 @@
+package dev.dbos.transact.conductor.protocol;
+
+public class GetQueueResponse extends BaseResponse {
+ public QueueOutput output;
+
+ public GetQueueResponse() {}
+
+ public GetQueueResponse(BaseMessage message, QueueOutput output) {
+ super(message.type, message.request_id);
+ this.output = output;
+ }
+
+ public GetQueueResponse(BaseMessage message, String errorMessage) {
+ super(message.type, message.request_id, errorMessage);
+ this.output = null;
+ }
+}
diff --git a/transact/src/main/java/dev/dbos/transact/conductor/protocol/ListQueuesRequest.java b/transact/src/main/java/dev/dbos/transact/conductor/protocol/ListQueuesRequest.java
new file mode 100644
index 00000000..8e787ecf
--- /dev/null
+++ b/transact/src/main/java/dev/dbos/transact/conductor/protocol/ListQueuesRequest.java
@@ -0,0 +1,8 @@
+package dev.dbos.transact.conductor.protocol;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ListQueuesRequest extends BaseMessage {
+ public ListQueuesRequest() {}
+}
diff --git a/transact/src/main/java/dev/dbos/transact/conductor/protocol/ListQueuesResponse.java b/transact/src/main/java/dev/dbos/transact/conductor/protocol/ListQueuesResponse.java
new file mode 100644
index 00000000..ed45fe47
--- /dev/null
+++ b/transact/src/main/java/dev/dbos/transact/conductor/protocol/ListQueuesResponse.java
@@ -0,0 +1,20 @@
+package dev.dbos.transact.conductor.protocol;
+
+import java.util.Collections;
+import java.util.List;
+
+public class ListQueuesResponse extends BaseResponse {
+ public List output;
+
+ public ListQueuesResponse() {}
+
+ public ListQueuesResponse(BaseMessage message, List output) {
+ super(message.type, message.request_id);
+ this.output = output;
+ }
+
+ public ListQueuesResponse(BaseMessage message, String errorMessage) {
+ super(message.type, message.request_id, errorMessage);
+ this.output = Collections.emptyList();
+ }
+}
diff --git a/transact/src/main/java/dev/dbos/transact/conductor/protocol/MessageType.java b/transact/src/main/java/dev/dbos/transact/conductor/protocol/MessageType.java
index a07463ff..957e351e 100644
--- a/transact/src/main/java/dev/dbos/transact/conductor/protocol/MessageType.java
+++ b/transact/src/main/java/dev/dbos/transact/conductor/protocol/MessageType.java
@@ -10,6 +10,7 @@ public enum MessageType {
EXPORT_WORKFLOW("export_workflow"),
FORK_WORKFLOW("fork_workflow"),
GET_METRICS("get_metrics"),
+ GET_QUEUE("get_queue"),
GET_SCHEDULE("get_schedule"),
GET_WORKFLOW_AGGREGATES("get_workflow_aggregates"),
GET_WORKFLOW_EVENTS("get_workflow_events"),
@@ -19,6 +20,7 @@ public enum MessageType {
IMPORT_WORKFLOW("import_workflow"),
LIST_APPLICATION_VERSIONS("list_application_versions"),
LIST_QUEUED_WORKFLOWS("list_queued_workflows"),
+ LIST_QUEUES("list_queues"),
LIST_SCHEDULES("list_schedules"),
LIST_STEPS("list_steps"),
LIST_WORKFLOWS("list_workflows"),
diff --git a/transact/src/main/java/dev/dbos/transact/conductor/protocol/QueueOutput.java b/transact/src/main/java/dev/dbos/transact/conductor/protocol/QueueOutput.java
new file mode 100644
index 00000000..f8c97dee
--- /dev/null
+++ b/transact/src/main/java/dev/dbos/transact/conductor/protocol/QueueOutput.java
@@ -0,0 +1,27 @@
+package dev.dbos.transact.conductor.protocol;
+
+import dev.dbos.transact.workflow.Queue;
+
+public record QueueOutput(
+ String name,
+ Integer concurrency,
+ Integer worker_concurrency,
+ Integer rate_limit_max,
+ Double rate_limit_period_sec,
+ boolean priority_enabled,
+ boolean partition_queue,
+ double polling_interval_sec) {
+
+ public static QueueOutput from(Queue q) {
+ Queue.RateLimit rl = q.rateLimit();
+ return new QueueOutput(
+ q.name(),
+ q.concurrency(),
+ q.workerConcurrency(),
+ rl != null ? rl.limit() : null,
+ rl != null ? rl.period().toMillis() / 1000.0 : null,
+ q.priorityEnabled(),
+ q.partitioningEnabled(),
+ q.pollingInterval().toMillis() / 1000.0);
+ }
+}
diff --git a/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java b/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java
index b0d100f3..0e01b1a6 100644
--- a/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java
+++ b/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java
@@ -22,6 +22,7 @@
import dev.dbos.transact.workflow.ListWorkflowsInput;
import dev.dbos.transact.workflow.NotificationInfo;
import dev.dbos.transact.workflow.Queue;
+import dev.dbos.transact.workflow.QueueOptions;
import dev.dbos.transact.workflow.ScheduleStatus;
import dev.dbos.transact.workflow.StepInfo;
import dev.dbos.transact.workflow.VersionInfo;
@@ -381,6 +382,30 @@ public List getQueuePartitions(String queueName) {
return dbRetry(() -> QueuesDAO.getQueuePartitions(ctx, queueName));
}
+ public boolean upsertQueue(String name, QueueOptions options, boolean updateExisting) {
+ if (Constants.DBOS_INTERNAL_QUEUE.equals(name)) {
+ throw new IllegalArgumentException(
+ String.format("%s is a reserved queue name", Constants.DBOS_INTERNAL_QUEUE));
+ }
+ return dbRetry(() -> QueuesDAO.upsertQueue(ctx, name, options, updateExisting));
+ }
+
+ public void updateQueue(String name, QueueOptions update) {
+ dbRetry(() -> QueuesDAO.updateQueue(ctx, name, update));
+ }
+
+ public Optional findQueue(String name) {
+ return dbRetry(() -> QueuesDAO.findQueue(ctx, name));
+ }
+
+ public List listQueues() {
+ return dbRetry(() -> QueuesDAO.listQueues(ctx));
+ }
+
+ public boolean deleteQueue(String name) {
+ return dbRetry(() -> QueuesDAO.deleteQueue(ctx, name));
+ }
+
public StepResult checkStepResult(String workflowId, int functionId, String functionName) {
return dbRetry(
diff --git a/transact/src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java b/transact/src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java
index f1409f99..8dbeadc0 100644
--- a/transact/src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java
+++ b/transact/src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java
@@ -1,7 +1,9 @@
package dev.dbos.transact.database.dao;
import dev.dbos.transact.database.DbContext;
+import dev.dbos.transact.workflow.Field;
import dev.dbos.transact.workflow.Queue;
+import dev.dbos.transact.workflow.QueueOptions;
import dev.dbos.transact.workflow.WorkflowState;
import java.sql.Connection;
@@ -9,11 +11,13 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -293,4 +297,267 @@ public static List getQueuePartitions(DbContext ctx, String queueName)
}
}
}
+
+ /**
+ * Upsert a queue row. Returns true iff a new row was inserted (i.e. the queue did not previously
+ * exist). Returns false if the row already existed, regardless of whether it was updated.
+ */
+ public static boolean upsertQueue(
+ DbContext ctx, String name, QueueOptions options, boolean updateExisting)
+ throws SQLException {
+ Queue queue = queueFromOptions(name, options);
+ final String insertSql =
+ """
+ INSERT INTO "%s".queues
+ (name, concurrency, worker_concurrency, rate_limit_max, rate_limit_period_sec,
+ priority_enabled, partition_queue, polling_interval_sec, updated_at)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
+ ON CONFLICT (name) DO NOTHING
+ """
+ .formatted(ctx.schema());
+ final String updateSql =
+ """
+ UPDATE "%s".queues SET
+ concurrency = ?,
+ worker_concurrency = ?,
+ rate_limit_max = ?,
+ rate_limit_period_sec = ?,
+ priority_enabled = ?,
+ partition_queue = ?,
+ polling_interval_sec = ?,
+ updated_at = ?
+ WHERE name = ?
+ """
+ .formatted(ctx.schema());
+
+ try (Connection connection = ctx.getConnection()) {
+ boolean inserted;
+ try (PreparedStatement ps = connection.prepareStatement(insertSql)) {
+ bindQueueParams(ps, queue, 1);
+ inserted = ps.executeUpdate() == 1;
+ }
+ if (!inserted && updateExisting) {
+ try (PreparedStatement ps = connection.prepareStatement(updateSql)) {
+ setNullableInt(ps, 1, queue.concurrency());
+ setNullableInt(ps, 2, queue.workerConcurrency());
+ var rateLimit = queue.rateLimit();
+ if (rateLimit != null) {
+ ps.setInt(3, rateLimit.limit());
+ ps.setDouble(4, rateLimit.period().toMillis() / 1000.0);
+ } else {
+ ps.setNull(3, java.sql.Types.INTEGER);
+ ps.setNull(4, java.sql.Types.DOUBLE);
+ }
+ ps.setBoolean(5, queue.priorityEnabled());
+ ps.setBoolean(6, queue.partitioningEnabled());
+ ps.setDouble(7, queue.pollingInterval().toMillis() / 1000.0);
+ ps.setLong(8, System.currentTimeMillis());
+ ps.setString(9, queue.name());
+ ps.executeUpdate();
+ }
+ }
+ return inserted;
+ }
+ }
+
+ private static void bindQueueParams(PreparedStatement ps, Queue queue, int offset)
+ throws SQLException {
+ ps.setString(offset, queue.name());
+ setNullableInt(ps, offset + 1, queue.concurrency());
+ setNullableInt(ps, offset + 2, queue.workerConcurrency());
+ var rateLimit = queue.rateLimit();
+ if (rateLimit != null) {
+ ps.setInt(offset + 3, rateLimit.limit());
+ ps.setDouble(offset + 4, rateLimit.period().toMillis() / 1000.0);
+ } else {
+ ps.setNull(offset + 3, java.sql.Types.INTEGER);
+ ps.setNull(offset + 4, java.sql.Types.DOUBLE);
+ }
+ ps.setBoolean(offset + 5, queue.priorityEnabled());
+ ps.setBoolean(offset + 6, queue.partitioningEnabled());
+ ps.setDouble(offset + 7, queue.pollingInterval().toMillis() / 1000.0);
+ ps.setLong(offset + 8, System.currentTimeMillis());
+ }
+
+ public static Optional findQueue(DbContext ctx, String name) throws SQLException {
+ final String sql =
+ """
+ SELECT name, concurrency, worker_concurrency,
+ rate_limit_max, rate_limit_period_sec,
+ priority_enabled, partition_queue, polling_interval_sec
+ FROM "%s".queues
+ WHERE name = ?
+ """
+ .formatted(ctx.schema());
+
+ try (Connection connection = ctx.getConnection();
+ PreparedStatement stmt = connection.prepareStatement(sql)) {
+ stmt.setString(1, name);
+ try (ResultSet rs = stmt.executeQuery()) {
+ if (rs.next()) {
+ return Optional.of(queueFromResultSet(rs));
+ }
+ return Optional.empty();
+ }
+ }
+ }
+
+ public static List listQueues(DbContext ctx) throws SQLException {
+ final String sql =
+ """
+ SELECT name, concurrency, worker_concurrency,
+ rate_limit_max, rate_limit_period_sec,
+ priority_enabled, partition_queue, polling_interval_sec
+ FROM "%s".queues
+ ORDER BY name
+ """
+ .formatted(ctx.schema());
+
+ try (Connection connection = ctx.getConnection();
+ PreparedStatement stmt = connection.prepareStatement(sql);
+ ResultSet rs = stmt.executeQuery()) {
+ List queues = new ArrayList<>();
+ while (rs.next()) {
+ queues.add(queueFromResultSet(rs));
+ }
+ return queues;
+ }
+ }
+
+ public static void updateQueue(DbContext ctx, String name, QueueOptions update)
+ throws SQLException {
+ if (update.isEmpty()) return;
+
+ List setClauses = new ArrayList<>();
+ List params = new ArrayList<>();
+
+ collectField(setClauses, params, "concurrency", update.concurrency());
+ collectField(setClauses, params, "worker_concurrency", update.workerConcurrency());
+ collectField(setClauses, params, "rate_limit_max", update.rateLimitMax());
+ collectField(
+ setClauses, params, "rate_limit_period_sec", durationToSec(update.rateLimitPeriod()));
+ collectOptional(setClauses, params, "priority_enabled", update.priorityEnabled());
+ collectOptional(setClauses, params, "partition_queue", update.partitionQueue());
+ collectOptional(
+ setClauses, params, "polling_interval_sec", durationToSec(update.pollingInterval()));
+
+ setClauses.add("\"updated_at\" = ?");
+ params.add(System.currentTimeMillis());
+ params.add(name);
+
+ String sql =
+ "UPDATE \"%s\".queues SET %s WHERE name = ?"
+ .formatted(ctx.schema(), String.join(", ", setClauses));
+
+ try (Connection connection = ctx.getConnection();
+ PreparedStatement ps = connection.prepareStatement(sql)) {
+ for (int i = 0; i < params.size(); i++) {
+ ps.setObject(i + 1, params.get(i));
+ }
+ ps.executeUpdate();
+ }
+ }
+
+ private static void collectField(
+ List clauses, List params, String column, Field field) {
+ if (field.isPresent()) {
+ clauses.add("\"" + column + "\" = ?");
+ params.add(field.get());
+ }
+ }
+
+ private static void collectOptional(
+ List clauses, List params, String column, Optional opt) {
+ opt.ifPresent(
+ value -> {
+ clauses.add("\"" + column + "\" = ?");
+ params.add(value);
+ });
+ }
+
+ private static Field durationToSec(Field field) {
+ if (!field.isPresent()) return Field.absent();
+ Duration d = field.get();
+ return Field.of(d != null ? d.toMillis() / 1000.0 : null);
+ }
+
+ private static Optional durationToSec(Optional opt) {
+ return opt.map(d -> d.toMillis() / 1000.0);
+ }
+
+ public static boolean deleteQueue(DbContext ctx, String name) throws SQLException {
+ final String sql = "DELETE FROM \"%s\".queues WHERE name = ?".formatted(ctx.schema());
+
+ try (Connection connection = ctx.getConnection();
+ PreparedStatement stmt = connection.prepareStatement(sql)) {
+ stmt.setString(1, name);
+ return stmt.executeUpdate() > 0;
+ }
+ }
+
+ private static Queue queueFromResultSet(ResultSet rs) throws SQLException {
+ String name = rs.getString("name");
+ Integer concurrency = rs.getObject("concurrency", Integer.class);
+ Integer workerConcurrency = rs.getObject("worker_concurrency", Integer.class);
+ Integer rateLimitMax = rs.getObject("rate_limit_max", Integer.class);
+ Double rateLimitPeriodSec = rs.getObject("rate_limit_period_sec", Double.class);
+ boolean priorityEnabled = rs.getBoolean("priority_enabled");
+ boolean partitioningEnabled = rs.getBoolean("partition_queue");
+ Double pollingIntervalSec = rs.getObject("polling_interval_sec", Double.class);
+
+ Queue.RateLimit rateLimit = null;
+ if (rateLimitMax != null && rateLimitPeriodSec != null) {
+ rateLimit =
+ new Queue.RateLimit(rateLimitMax, Duration.ofMillis((long) (rateLimitPeriodSec * 1000)));
+ }
+ Duration pollingInterval =
+ pollingIntervalSec != null
+ ? Duration.ofMillis((long) (pollingIntervalSec * 1000))
+ : Queue.DEFAULT_POLLING_INTERVAL;
+ return new Queue(
+ name,
+ concurrency,
+ workerConcurrency,
+ priorityEnabled,
+ partitioningEnabled,
+ rateLimit,
+ pollingInterval);
+ }
+
+ private static Queue queueFromOptions(String name, QueueOptions options) {
+ Integer concurrencyVal = options.concurrency().isPresent() ? options.concurrency().get() : null;
+ Integer workerConcurrencyVal =
+ options.workerConcurrency().isPresent() ? options.workerConcurrency().get() : null;
+ boolean priorityEnabledVal = options.priorityEnabled().orElse(false);
+ boolean partitionQueueVal = options.partitionQueue().orElse(false);
+
+ Queue.RateLimit rateLimit = null;
+ if (options.rateLimitMax().isPresent()
+ && options.rateLimitPeriod().isPresent()
+ && options.rateLimitMax().get() != null
+ && options.rateLimitPeriod().get() != null) {
+ rateLimit =
+ new Queue.RateLimit(options.rateLimitMax().get(), options.rateLimitPeriod().get());
+ }
+
+ Duration pollingIntervalVal = options.pollingInterval().orElse(Queue.DEFAULT_POLLING_INTERVAL);
+
+ return new Queue(
+ name,
+ concurrencyVal,
+ workerConcurrencyVal,
+ priorityEnabledVal,
+ partitionQueueVal,
+ rateLimit,
+ pollingIntervalVal);
+ }
+
+ private static void setNullableInt(PreparedStatement stmt, int index, Integer value)
+ throws SQLException {
+ if (value != null) {
+ stmt.setInt(index, value);
+ } else {
+ stmt.setNull(index, java.sql.Types.INTEGER);
+ }
+ }
}
diff --git a/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java b/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java
index ba1daa6e..0a5f5f6b 100644
--- a/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java
+++ b/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java
@@ -30,6 +30,8 @@
import dev.dbos.transact.workflow.ForkOptions;
import dev.dbos.transact.workflow.ListWorkflowsInput;
import dev.dbos.transact.workflow.Queue;
+import dev.dbos.transact.workflow.QueueConflictResolution;
+import dev.dbos.transact.workflow.QueueOptions;
import dev.dbos.transact.workflow.ScheduleStatus;
import dev.dbos.transact.workflow.SerializationStrategy;
import dev.dbos.transact.workflow.StepInfo;
@@ -392,14 +394,48 @@ public Optional getRegisteredWorkflow(
return Optional.ofNullable(this.workflowMap.get(fqName));
}
- public Collection getQueues() {
+ public Optional findQueue(String queueName) {
+ return findStaticQueue(queueName)
+ .or(() -> queueService.findDynamicQueue(queueName))
+ .or(() -> systemDatabase.findQueue(queueName));
+ }
+
+ public Collection getStaticQueues() {
return this.queueMap.values();
}
- public Optional getQueue(String queueName) {
+ public Optional findStaticQueue(String queueName) {
return Optional.ofNullable(this.queueMap.get(queueName));
}
+ public void registerDynamicQueue(
+ String name, QueueOptions options, QueueConflictResolution onConflict) {
+ boolean updateExisting =
+ switch (onConflict) {
+ case ALWAYS_UPDATE -> true;
+ case NEVER_UPDATE -> false;
+ case UPDATE_IF_LATEST_VERSION ->
+ appVersion.equals(systemDatabase.getLatestApplicationVersion().versionName());
+ };
+ systemDatabase.upsertQueue(name, options, updateExisting);
+ }
+
+ public void updateDynamicQueue(String name, QueueOptions options) {
+ systemDatabase.updateQueue(name, options);
+ }
+
+ public Optional findDynamicQueue(String name) {
+ return systemDatabase.findQueue(name);
+ }
+
+ public boolean deleteDynamicQueue(String name) {
+ return systemDatabase.deleteQueue(name);
+ }
+
+ public List listDynamicQueues() {
+ return systemDatabase.listQueues();
+ }
+
public void fireAlertHandler(String name, String message, Map metadata) {
if (alertHandler != null) {
alertHandler.invoke(name, message, metadata);
@@ -1424,7 +1460,7 @@ private void validateWorkflow(String workflowName, String className, String inst
private void validateQueue(String queueName) {
if (queueName != null) {
- getQueue(queueName)
+ findQueue(queueName)
.orElseThrow(
() -> new IllegalStateException("Queue %s is not registered".formatted(queueName)));
}
@@ -1437,7 +1473,7 @@ private void validateQueue(String queueName, String queuePartitionKey) {
"DBOS internal queue is not a partitioned queue, but a partition key was provided");
}
} else {
- var queue = this.getQueue(queueName);
+ var queue = findQueue(queueName);
if (queue.isPresent()) {
if (queue.get().partitioningEnabled() && queuePartitionKey == null) {
throw new IllegalArgumentException(
diff --git a/transact/src/main/java/dev/dbos/transact/execution/QueueService.java b/transact/src/main/java/dev/dbos/transact/execution/QueueService.java
index 71eee21d..50547379 100644
--- a/transact/src/main/java/dev/dbos/transact/execution/QueueService.java
+++ b/transact/src/main/java/dev/dbos/transact/execution/QueueService.java
@@ -6,14 +6,18 @@
import java.time.Duration;
import java.util.Collection;
+import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -21,12 +25,17 @@
public class QueueService implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(QueueService.class);
+ private static final Duration MAX_POLLING_INTERVAL = Duration.ofSeconds(120);
+ private static final long DB_QUEUE_SUPERVISOR_INTERVAL_SEC = 1;
private final AtomicReference execServiceRef = new AtomicReference<>();
private final AtomicBoolean paused = new AtomicBoolean(false);
+ private final Set dbListeningQueues = ConcurrentHashMap.newKeySet();
+ private volatile Map dynamicQueueMap = Map.of();
private final SystemDatabase systemDatabase;
private final DBOSExecutor dbosExecutor;
+ private Set listenQueues;
private double speedup = 1.0;
public QueueService(DBOSExecutor dbosExecutor, SystemDatabase systemDatabase) {
@@ -46,12 +55,18 @@ public void unpause() {
paused.set(false);
}
- public void start(Collection queues, Set listenQueues) {
+ public void start(Collection staticQueues, Set listenQueues) {
if (this.execServiceRef.get() == null) {
var procCount = Runtime.getRuntime().availableProcessors();
var scheduler = Executors.newScheduledThreadPool(procCount);
if (this.execServiceRef.compareAndSet(null, scheduler)) {
- startQueueListeners(queues, listenQueues);
+ this.listenQueues = listenQueues;
+ scheduler.scheduleAtFixedRate(this::transitionDelayedWorkflows, 1, 1, TimeUnit.SECONDS);
+ scheduler.scheduleAtFixedRate(
+ this::pollDynamicQueues, 0, DB_QUEUE_SUPERVISOR_INTERVAL_SEC, TimeUnit.SECONDS);
+ for (var queue : staticQueues) {
+ startQueueListenerIfNeeded(queue, false);
+ }
}
}
}
@@ -69,107 +84,144 @@ public boolean isStopped() {
return this.execServiceRef.get() == null;
}
- private void startQueueListeners(Collection queues, Set listenQueues) {
- logger.debug("startQueueListeners");
+ public Optional findDynamicQueue(String queueName) {
+ return Optional.ofNullable(dynamicQueueMap.get(queueName));
+ }
+
+ private boolean isListening(String queueName) {
+ return queueName.equals(Constants.DBOS_INTERNAL_QUEUE)
+ || listenQueues.isEmpty()
+ || listenQueues.contains(queueName);
+ }
+
+ private void startQueueListenerIfNeeded(Queue queue, boolean dynamic) {
+ if (!isListening(queue.name())) return;
+ if (dynamic && !dbListeningQueues.add(queue.name())) return;
+ if (execServiceRef.get() == null) return;
+
+ new QueueListenerTask(queue, dynamic)
+ .schedule(); // executor holds the reference via the scheduled future
+ }
+
+ // ── Dynamic queue supervisor ──────────────────────────────────────────────
+
+ private void pollDynamicQueues() {
+ try {
+ if (execServiceRef.get() == null) return;
+
+ var dbQueues = systemDatabase.listQueues();
+ dynamicQueueMap =
+ dbQueues.stream().collect(Collectors.toUnmodifiableMap(Queue::name, q -> q));
+ if (logger.isDebugEnabled()) {
+ logger.debug("pollDynamicQueues found {} queues", dbQueues.size());
+ for (var q : dbQueues) {
+ logger.debug(
+ " queue: {} concurrency: {} pollingInterval: {}",
+ q.name(),
+ q.concurrency(),
+ q.pollingInterval());
+ }
+ }
+
+ for (var queue : dbQueues) {
+ if (dbosExecutor.findStaticQueue(queue.name()).isPresent()) {
+ logger.warn(
+ "Database-backed queue {} has the same name as a static queue; "
+ + "the static queue's configuration is being used and the database-backed queue is ignored.",
+ queue.name());
+ continue;
+ }
+ startQueueListenerIfNeeded(queue, true);
+ }
+ } catch (Exception e) {
+ logger.error("pollDynamicQueues failed", e);
+ }
+ }
+
+ // ── Queue listener task ───────────────────────────────────────────────────
+
+ private class QueueListenerTask implements Runnable {
- final var executorId = dbosExecutor.executorId();
- final var appVersion = dbosExecutor.appVersion();
- final Duration minPollingInterval = Duration.ofSeconds(1);
- final Duration maxPollingInterval = Duration.ofSeconds(120);
+ Queue queue;
+ double backoffFactor = 1.0;
+ final boolean dynamic;
+ final String executorId = dbosExecutor.executorId();
+ final String appVersion = dbosExecutor.appVersion();
- var execService = execServiceRef.get();
- if (execService != null) {
- execService.scheduleAtFixedRate(this::transitionDelayedWorkflows, 1, 1, TimeUnit.SECONDS);
+ QueueListenerTask(Queue queue, boolean dynamic) {
+ this.queue = queue;
+ this.dynamic = dynamic;
}
- for (var _queue : queues) {
+ void schedule() {
+ var randomSleepFactor = 0.95 + ThreadLocalRandom.current().nextDouble(0.1);
+ var delayMs =
+ (long) (randomSleepFactor * queue.pollingInterval().toMillis() * backoffFactor * speedup);
+ var svc = execServiceRef.get();
+ if (svc != null) {
+ svc.schedule(this, delayMs, TimeUnit.MILLISECONDS);
+ }
+ }
- var listening =
- _queue.name().equals(Constants.DBOS_INTERNAL_QUEUE)
- || listenQueues.isEmpty()
- || listenQueues.contains(_queue.name());
- if (!listening) {
- continue;
+ private void processPartition(String partition) {
+ var partitionLog = Objects.requireNonNullElse(partition, "");
+ if (!paused.get()) {
+ var workflowIds =
+ systemDatabase.getAndStartQueuedWorkflows(queue, executorId, appVersion, partition);
+ if (!workflowIds.isEmpty()) {
+ logger.debug(
+ "Retrieved {} workflows from {} partition of queue {}",
+ workflowIds.size(),
+ partitionLog,
+ queue.name());
+ }
+ for (var workflowId : workflowIds) {
+ logger.debug(
+ "Starting workflow {} from {} partition of queue {}",
+ workflowId,
+ partitionLog,
+ queue.name());
+ dbosExecutor.executeWorkflowById(workflowId, false, true);
+ }
}
+ }
- var task =
- new Runnable() {
- final Queue queue = _queue;
- Duration pollingInterval = Duration.ofSeconds(1);
-
- public void schedule() {
- var randomSleepFactor = 0.95 + ThreadLocalRandom.current().nextDouble(0.1);
- var delayMs = (long) (randomSleepFactor * pollingInterval.toMillis() * speedup);
- var execService = execServiceRef.get();
- if (execService != null) {
- execService.schedule(this, delayMs, TimeUnit.MILLISECONDS);
- }
- }
-
- private void processPartition(String partition) {
- var partitionLog = Objects.requireNonNullElse(partition, "");
- if (!paused.get()) {
- var workflowIds =
- systemDatabase.getAndStartQueuedWorkflows(
- queue, executorId, appVersion, partition);
- if (workflowIds.size() > 0) {
- logger.debug(
- "Retrieved {} workflows from {} partition of queue {}",
- workflowIds.size(),
- partitionLog,
- queue.name());
- }
- for (var workflowId : workflowIds) {
- logger.debug(
- "Starting workflow {} from {} partition of queue {}",
- workflowId,
- partitionLog,
- queue.name());
- dbosExecutor.executeWorkflowById(workflowId, false, true);
- }
- }
- }
-
- @Override
- public void run() {
- // if scheduler service isn't running, the queue service was stopped so don't start
- // the workflow or schedule the next execution
- if (execServiceRef.get() == null) {
- return;
- }
-
- try {
- if (queue.partitioningEnabled()) {
- var partitions = systemDatabase.getQueuePartitions(queue.name());
- for (var partition : partitions) {
- processPartition(partition);
- }
- } else {
- processPartition(null);
- }
-
- pollingInterval = Duration.ofMillis((long) (pollingInterval.toMillis() * 0.9));
- pollingInterval =
- pollingInterval.compareTo(minPollingInterval) >= 0
- ? pollingInterval
- : minPollingInterval;
- } catch (Exception e) {
- logger.error("Error executing queued workflow(s) for queue {}", queue.name(), e);
- pollingInterval = pollingInterval.multipliedBy(2);
- pollingInterval =
- pollingInterval.compareTo(maxPollingInterval) <= 0
- ? pollingInterval
- : maxPollingInterval;
- } finally {
- this.schedule();
- }
- }
- };
-
- task.schedule();
+ @Override
+ public void run() {
+ if (execServiceRef.get() == null) return;
+ if (dynamic) {
+ var refreshed = systemDatabase.findQueue(queue.name());
+ if (refreshed.isEmpty()) {
+ dbListeningQueues.remove(queue.name());
+ return;
+ }
+ queue = refreshed.get();
+ }
+
+ try {
+ if (queue.partitioningEnabled()) {
+ var partitions = systemDatabase.getQueuePartitions(queue.name());
+ for (var partition : partitions) {
+ processPartition(partition);
+ }
+ } else {
+ processPartition(null);
+ }
+
+ backoffFactor = Math.max(backoffFactor * 0.9, 1.0);
+ } catch (Exception e) {
+ logger.error("Error executing queued workflow(s) for queue {}", queue.name(), e);
+ double maxFactor =
+ (double) MAX_POLLING_INTERVAL.toMillis() / queue.pollingInterval().toMillis();
+ backoffFactor = Math.min(backoffFactor * 2.0, maxFactor);
+ } finally {
+ this.schedule();
+ }
}
}
+ // ── Shared helpers ────────────────────────────────────────────────────────
+
private void transitionDelayedWorkflows() {
if (!paused.get()) {
try {
diff --git a/transact/src/main/java/dev/dbos/transact/execution/SchedulerService.java b/transact/src/main/java/dev/dbos/transact/execution/SchedulerService.java
index f290ac75..1461551b 100644
--- a/transact/src/main/java/dev/dbos/transact/execution/SchedulerService.java
+++ b/transact/src/main/java/dev/dbos/transact/execution/SchedulerService.java
@@ -85,170 +85,168 @@ public void unpause() {
private void pollWorkflowSchedules() {
try {
- pollWorkflowSchedulesImpl();
- } catch (Exception e) {
- // Catch all exceptions to prevent scheduleAtFixedRate from permanently suppressing future
- // poll invocations. A transient DB failure should not permanently disable the scheduler.
- logger.error("pollWorkflowSchedules failed", e);
- }
- }
-
- private void pollWorkflowSchedulesImpl() {
- // if execServiceRef is null, the scheduler service was shut down so don't poll schedules
- if (execServiceRef.get() == null) {
- return;
- }
-
- var schedules = dbosExecutor.listSchedules(null, null, null);
- if (logger.isDebugEnabled()) {
- logger.debug("pollWorkflowSchedules found {} schedules", schedules.size());
- for (var s : schedules) {
- logger.debug(
- " schedule: {} workflow: {} cron: {}", s.scheduleName(), s.workflowName(), s.cron());
+ // if execServiceRef is null, the scheduler service was shut down so don't poll schedules
+ if (execServiceRef.get() == null) {
+ return;
}
- }
- // shut down any scheduled future that isn't in the list of current schedules
- var currentIds = schedules.stream().map(WorkflowSchedule::id).collect(Collectors.toSet());
- for (var key : workflowScheduleFutures.keySet()) {
- if (!currentIds.contains(key)) {
- cancelWorkflowSchedule(key);
+ var schedules = dbosExecutor.listSchedules(null, null, null);
+ if (logger.isDebugEnabled()) {
+ logger.debug("pollWorkflowSchedules found {} schedules", schedules.size());
+ for (var s : schedules) {
+ logger.debug(
+ " schedule: {} workflow: {} cron: {}", s.scheduleName(), s.workflowName(), s.cron());
+ }
}
- }
- for (var schedule : schedules) {
- var scheduleRunning = workflowScheduleFutures.containsKey(schedule.id());
- if (!schedule.isActive()) {
- // if the schedule is no longer active but we still have a scheduled future for it, cancel
- // it
- if (scheduleRunning) {
- cancelWorkflowSchedule(schedule.id());
- }
- } else if (!scheduleRunning) {
- // if the schedule is active but we don't yet have a scheduled future for it, schedule it
- // now
- var optRegWf =
- dbosExecutor.getRegisteredWorkflow(schedule.workflowName(), schedule.className(), "");
- if (optRegWf.isEmpty()) {
- logger.error(
- "Workflow schedule {} has missing workflow function {}",
- schedule.scheduleName(),
- RegisteredWorkflow.fullyQualifiedName(schedule.workflowName(), schedule.className()));
- continue;
+ // shut down any scheduled future that isn't in the list of current schedules
+ var currentIds = schedules.stream().map(WorkflowSchedule::id).collect(Collectors.toSet());
+ for (var key : workflowScheduleFutures.keySet()) {
+ if (!currentIds.contains(key)) {
+ cancelWorkflowSchedule(key);
}
+ }
- var regWorkflow = optRegWf.orElseThrow();
- if (!Arrays.equals(regWorkflow.workflowMethod().getParameterTypes(), EXPECTED_PARAMETERS)) {
- logger.error(
- "Workflow schedule {} workflow {} has invalid signature, signature must be (Instant, Object)",
- schedule.scheduleName(),
- regWorkflow.fullyQualifiedName());
- continue;
- }
+ for (var schedule : schedules) {
+ var scheduleRunning = workflowScheduleFutures.containsKey(schedule.id());
+ if (!schedule.isActive()) {
+ // if the schedule is no longer active but we still have a scheduled future for it, cancel
+ // it
+ if (scheduleRunning) {
+ cancelWorkflowSchedule(schedule.id());
+ }
+ } else if (!scheduleRunning) {
+ // if the schedule is active but we don't yet have a scheduled future for it, schedule it
+ // now
+ var optRegWf =
+ dbosExecutor.getRegisteredWorkflow(schedule.workflowName(), schedule.className(), "");
+ if (optRegWf.isEmpty()) {
+ logger.error(
+ "Workflow schedule {} has missing workflow function {}",
+ schedule.scheduleName(),
+ RegisteredWorkflow.fullyQualifiedName(
+ schedule.workflowName(), schedule.className()));
+ continue;
+ }
- final String queueName =
- Objects.requireNonNullElse(schedule.queueName(), Constants.DBOS_INTERNAL_QUEUE);
- if (dbosExecutor.getQueue(queueName).isEmpty()) {
- logger.error(
- "Workflow schedule {} has invalid queue {}", schedule.scheduleName(), queueName);
- continue;
- }
+ var regWorkflow = optRegWf.orElseThrow();
+ if (!Arrays.equals(
+ regWorkflow.workflowMethod().getParameterTypes(), EXPECTED_PARAMETERS)) {
+ logger.error(
+ "Workflow schedule {} workflow {} has invalid signature, signature must be (Instant, Object)",
+ schedule.scheduleName(),
+ regWorkflow.fullyQualifiedName());
+ continue;
+ }
- Cron cron;
- try {
- cron = CRON_PARSER.parse(schedule.cron()).validate();
- } catch (Exception e) {
- logger.error(
- "Workflow schedule {} has invalid cron expression {}",
- schedule.scheduleName(),
- schedule.cron(),
- e);
- continue;
- }
+ final String queueName =
+ Objects.requireNonNullElse(schedule.queueName(), Constants.DBOS_INTERNAL_QUEUE);
+ if (dbosExecutor.findQueue(queueName).isEmpty()) {
+ logger.error(
+ "Workflow schedule {} has invalid queue {}", schedule.scheduleName(), queueName);
+ continue;
+ }
- if (schedule.automaticBackfill()
- && schedule.lastFiredAt() != null
- && schedule.lastFiredAt().isBefore(Instant.now())) {
- dbosExecutor.backfillSchedule(
- schedule.scheduleName(), schedule.lastFiredAt(), Instant.now());
- }
+ Cron cron;
+ try {
+ cron = CRON_PARSER.parse(schedule.cron()).validate();
+ } catch (Exception e) {
+ logger.error(
+ "Workflow schedule {} has invalid cron expression {}",
+ schedule.scheduleName(),
+ schedule.cron(),
+ e);
+ continue;
+ }
- var task =
- new Runnable() {
+ if (schedule.automaticBackfill()
+ && schedule.lastFiredAt() != null
+ && schedule.lastFiredAt().isBefore(Instant.now())) {
+ dbosExecutor.backfillSchedule(
+ schedule.scheduleName(), schedule.lastFiredAt(), Instant.now());
+ }
- final ZoneId timeZone =
- Objects.requireNonNullElseGet(
- schedule.cronTimezone(), () -> ZoneId.systemDefault());
- final WorkflowSchedule wfSchedule = schedule;
- final ExecutionTime executionTime = ExecutionTime.forCron(cron);
+ var task =
+ new Runnable() {
- ZonedDateTime nextTime = ZonedDateTime.now(timeZone);
+ final ZoneId timeZone =
+ Objects.requireNonNullElseGet(
+ schedule.cronTimezone(), () -> ZoneId.systemDefault());
+ final WorkflowSchedule wfSchedule = schedule;
+ final ExecutionTime executionTime = ExecutionTime.forCron(cron);
- public void schedule() {
- executionTime
- .nextExecution(nextTime)
- .ifPresent(
- cronTime -> {
- this.nextTime = cronTime.truncatedTo(ChronoUnit.SECONDS);
- var prevFuture =
- workflowScheduleFutures.put(
- wfSchedule.id(), scheduleTask(this.nextTime, this));
- // prevFuture should be null or a scheduled task that already fired.
- // cancel it anyway just to be sure
- if (prevFuture != null) {
- if (!prevFuture.isDone()) {
- logger.debug(
- "Previous scheduled task for {} has not yet completed",
- wfSchedule.scheduleName());
- }
- prevFuture.cancel(false);
- }
- });
- }
+ ZonedDateTime nextTime = ZonedDateTime.now(timeZone);
- @Override
- public void run() {
- // if execServiceRef is null, the scheduler service was shut down so don't start the
- // workflow or schedule the next execution
- if (execServiceRef.get() == null) {
- return;
+ public void schedule() {
+ executionTime
+ .nextExecution(nextTime)
+ .ifPresent(
+ cronTime -> {
+ this.nextTime = cronTime.truncatedTo(ChronoUnit.SECONDS);
+ var prevFuture =
+ workflowScheduleFutures.put(
+ wfSchedule.id(), scheduleTask(this.nextTime, this));
+ // prevFuture should be null or a scheduled task that already fired.
+ // cancel it anyway just to be sure
+ if (prevFuture != null) {
+ if (!prevFuture.isDone()) {
+ logger.debug(
+ "Previous scheduled task for {} has not yet completed",
+ wfSchedule.scheduleName());
+ }
+ prevFuture.cancel(false);
+ }
+ });
}
- try {
- if (paused.get()) {
+ @Override
+ public void run() {
+ // if execServiceRef is null, the scheduler service was shut down so don't start
+ // the workflow or schedule the next execution
+ if (execServiceRef.get() == null) {
+ return;
+ }
+
+ try {
+ if (paused.get()) {
+ logger.debug(
+ "Skipping scheduled workflow {} schedule {} because scheduler is paused",
+ regWorkflow.fullyQualifiedName(),
+ wfSchedule.scheduleName());
+ return;
+ }
+ var args = new Object[] {nextTime.toInstant(), wfSchedule.context()};
+ var workflowId =
+ "sched-%s-%s"
+ .formatted(wfSchedule.scheduleName(), nextTime.toOffsetDateTime());
logger.debug(
- "Skipping scheduled workflow {} schedule {} because scheduler is paused",
+ "Queuing scheduled workflow {} schedule {} workflowId {}",
regWorkflow.fullyQualifiedName(),
- wfSchedule.scheduleName());
- return;
+ wfSchedule.scheduleName(),
+ workflowId);
+ var appVersion = dbosExecutor.getLatestApplicationVersion().versionName();
+ var options =
+ new StartWorkflowOptions(workflowId)
+ .withQueue(queueName)
+ .withAppVersion(appVersion);
+ dbosExecutor.startRegisteredWorkflow(regWorkflow, args, options);
+ systemDatabase.updateScheduleLastFiredAt(
+ wfSchedule.scheduleName(), nextTime.toInstant());
+ } catch (Exception e) {
+ logger.error("Scheduled task {} exception", schedule.scheduleName(), e);
+ } finally {
+ schedule();
}
- var args = new Object[] {nextTime.toInstant(), wfSchedule.context()};
- var workflowId =
- "sched-%s-%s"
- .formatted(wfSchedule.scheduleName(), nextTime.toOffsetDateTime());
- logger.debug(
- "Queuing scheduled workflow {} schedule {} workflowId {}",
- regWorkflow.fullyQualifiedName(),
- wfSchedule.scheduleName(),
- workflowId);
- var appVersion = dbosExecutor.getLatestApplicationVersion().versionName();
- var options =
- new StartWorkflowOptions(workflowId)
- .withQueue(queueName)
- .withAppVersion(appVersion);
- dbosExecutor.startRegisteredWorkflow(regWorkflow, args, options);
- systemDatabase.updateScheduleLastFiredAt(
- wfSchedule.scheduleName(), nextTime.toInstant());
- } catch (Exception e) {
- logger.error("Scheduled task {} exception", schedule.scheduleName(), e);
- } finally {
- schedule();
}
- }
- };
+ };
- task.schedule();
+ task.schedule();
+ }
}
+ } catch (Exception e) {
+ // Catch all exceptions to prevent scheduleAtFixedRate from permanently suppressing future
+ // poll invocations. A transient DB failure should not permanently disable the scheduler.
+ logger.error("pollWorkflowSchedules failed", e);
}
}
diff --git a/transact/src/main/java/dev/dbos/transact/workflow/Field.java b/transact/src/main/java/dev/dbos/transact/workflow/Field.java
new file mode 100644
index 00000000..d7f75c84
--- /dev/null
+++ b/transact/src/main/java/dev/dbos/transact/workflow/Field.java
@@ -0,0 +1,31 @@
+package dev.dbos.transact.workflow;
+
+import org.jspecify.annotations.Nullable;
+
+/**
+ * Three-state field for partial updates: absent (don't touch), present with a value, or present
+ * with null (clear the column).
+ */
+public sealed interface Field permits Field.Absent, Field.Present {
+
+ record Absent() implements Field {}
+
+ record Present(@Nullable T value) implements Field {}
+
+ static Field absent() {
+ return new Absent<>();
+ }
+
+ static Field of(@Nullable T value) {
+ return new Present<>(value);
+ }
+
+ default boolean isPresent() {
+ return this instanceof Present;
+ }
+
+ default @Nullable T get() {
+ if (this instanceof Present p) return p.value();
+ throw new IllegalStateException("Field.get() called on an absent field");
+ }
+}
diff --git a/transact/src/main/java/dev/dbos/transact/workflow/Queue.java b/transact/src/main/java/dev/dbos/transact/workflow/Queue.java
index 46ab31f8..2ce4b37d 100644
--- a/transact/src/main/java/dev/dbos/transact/workflow/Queue.java
+++ b/transact/src/main/java/dev/dbos/transact/workflow/Queue.java
@@ -2,35 +2,45 @@
import java.time.Duration;
import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import org.jspecify.annotations.NonNull;
+import org.jspecify.annotations.Nullable;
/**
* Property definition for a DBOS workflow queue. Provides options for a name, concurrency and rate
* limits, prioritization behavior and partitioned behavior
*/
public record Queue(
- String name,
- Integer concurrency,
- Integer workerConcurrency,
+ @NonNull String name,
+ @Nullable Integer concurrency,
+ @Nullable Integer workerConcurrency,
boolean priorityEnabled,
boolean partitioningEnabled,
- RateLimit rateLimit) {
+ @Nullable RateLimit rateLimit,
+ @NonNull Duration pollingInterval) {
+
+ public static final Duration DEFAULT_POLLING_INTERVAL = Duration.ofSeconds(1);
/** Rate limit parameter structure for DBOS workflow queues */
public record RateLimit(int limit, Duration period) {}
public Queue {
Objects.requireNonNull(name, "Queue name must not be null");
+ Objects.requireNonNull(pollingInterval, "Queue pollingInterval must not be null");
if (concurrency != null && concurrency <= 0)
throw new IllegalArgumentException(
"If specified, queue concurrency must be greater than zero");
if (workerConcurrency != null && workerConcurrency <= 0)
throw new IllegalArgumentException(
"If specified, queue workerConcurrency must be greater than zero");
+ if (pollingInterval.isNegative() || pollingInterval.isZero())
+ throw new IllegalArgumentException("Queue pollingInterval must be greater than zero");
}
/** Construct a queue with a given name */
- public Queue(String name) {
- this(name, null, null, false, false, null);
+ public Queue(@NonNull String name) {
+ this(name, null, null, false, false, null, DEFAULT_POLLING_INTERVAL);
}
/**
@@ -41,48 +51,84 @@ public boolean hasLimiter() {
}
/** Produces a new Queue with the assigned name. */
- public Queue withName(String name) {
+ public Queue withName(@NonNull String name) {
return new Queue(
- name, concurrency, workerConcurrency, priorityEnabled, partitioningEnabled, rateLimit);
+ name,
+ concurrency,
+ workerConcurrency,
+ priorityEnabled,
+ partitioningEnabled,
+ rateLimit,
+ pollingInterval);
}
/**
* Produces a new Queue with the assigned global concurrency. `null` may be specified to remove
* the concurrency limit.
*/
- public Queue withConcurrency(Integer concurrency) {
+ public Queue withConcurrency(@Nullable Integer concurrency) {
return new Queue(
- name, concurrency, workerConcurrency, priorityEnabled, partitioningEnabled, rateLimit);
+ name,
+ concurrency,
+ workerConcurrency,
+ priorityEnabled,
+ partitioningEnabled,
+ rateLimit,
+ pollingInterval);
}
/**
* Produces a new Queue with the assigned per-worker concurrency. `null` may be specified to
* remove the concurrency limit.
*/
- public Queue withWorkerConcurrency(Integer workerConcurrency) {
+ public Queue withWorkerConcurrency(@Nullable Integer workerConcurrency) {
return new Queue(
- name, concurrency, workerConcurrency, priorityEnabled, partitioningEnabled, rateLimit);
+ name,
+ concurrency,
+ workerConcurrency,
+ priorityEnabled,
+ partitioningEnabled,
+ rateLimit,
+ pollingInterval);
}
/** Produces a new Queue with the prioritization enabled/disabled. */
public Queue withPriorityEnabled(boolean priorityEnabled) {
return new Queue(
- name, concurrency, workerConcurrency, priorityEnabled, partitioningEnabled, rateLimit);
+ name,
+ concurrency,
+ workerConcurrency,
+ priorityEnabled,
+ partitioningEnabled,
+ rateLimit,
+ pollingInterval);
}
/** Produces a new Queue with the partitioned enabled/disabled. */
public Queue withPartitioningEnabled(boolean partitioningEnabled) {
return new Queue(
- name, concurrency, workerConcurrency, priorityEnabled, partitioningEnabled, rateLimit);
+ name,
+ concurrency,
+ workerConcurrency,
+ priorityEnabled,
+ partitioningEnabled,
+ rateLimit,
+ pollingInterval);
}
/**
* Produces a new Queue with the assigned rate limit. `null` may be specified to remove the rate
* limit.
*/
- public Queue withRateLimit(RateLimit rateLimit) {
+ public Queue withRateLimit(@Nullable RateLimit rateLimit) {
return new Queue(
- name, concurrency, workerConcurrency, priorityEnabled, partitioningEnabled, rateLimit);
+ name,
+ concurrency,
+ workerConcurrency,
+ priorityEnabled,
+ partitioningEnabled,
+ rateLimit,
+ pollingInterval);
}
/**
@@ -92,10 +138,20 @@ public Queue withRateLimit(int limit, Duration period) {
return withRateLimit(new RateLimit(limit, period));
}
- /**
- * Produces a new Queue with the assigned rate limit, expressed in workflows per period (seconds).
- */
- public Queue withRateLimit(int limit, double period) {
- return withRateLimit(new RateLimit(limit, Duration.ofMillis((long) (period * 1000))));
+ /** Produces a new Queue with the assigned rate limit, expressed in workflows per period. */
+ public Queue withRateLimit(int limit, long period, TimeUnit unit) {
+ return withRateLimit(new RateLimit(limit, Duration.of(period, unit.toChronoUnit())));
+ }
+
+ /** Produces a new Queue with the assigned polling interval. */
+ public Queue withPollingInterval(@NonNull Duration pollingInterval) {
+ return new Queue(
+ name,
+ concurrency,
+ workerConcurrency,
+ priorityEnabled,
+ partitioningEnabled,
+ rateLimit,
+ pollingInterval);
}
}
diff --git a/transact/src/main/java/dev/dbos/transact/workflow/QueueConflictResolution.java b/transact/src/main/java/dev/dbos/transact/workflow/QueueConflictResolution.java
new file mode 100644
index 00000000..5aee7d5a
--- /dev/null
+++ b/transact/src/main/java/dev/dbos/transact/workflow/QueueConflictResolution.java
@@ -0,0 +1,23 @@
+package dev.dbos.transact.workflow;
+
+/**
+ * Controls what happens when {@code registerQueue} is called for a queue that already exists in the
+ * database.
+ */
+public enum QueueConflictResolution {
+ /**
+ * Overwrite the existing queue configuration unconditionally. Default for {@link
+ * dev.dbos.transact.DBOSClient}.
+ */
+ ALWAYS_UPDATE,
+
+ /** Leave the existing queue configuration unchanged. */
+ NEVER_UPDATE,
+
+ /**
+ * Overwrite the existing queue configuration only if the running application version matches the
+ * latest application version registered in the database. Default for {@link
+ * dev.dbos.transact.DBOS}.
+ */
+ UPDATE_IF_LATEST_VERSION,
+}
diff --git a/transact/src/main/java/dev/dbos/transact/workflow/QueueOptions.java b/transact/src/main/java/dev/dbos/transact/workflow/QueueOptions.java
new file mode 100644
index 00000000..1a679cff
--- /dev/null
+++ b/transact/src/main/java/dev/dbos/transact/workflow/QueueOptions.java
@@ -0,0 +1,192 @@
+package dev.dbos.transact.workflow;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import org.jspecify.annotations.Nullable;
+
+/**
+ * Configuration options for a DBOS workflow queue. Used for both registration of database-backed
+ * queues and partial updates to queue configuration.
+ *
+ * When used for registration, absent and null fields both result in the column being null (no
+ * limit / use default). When used for updates, absent fields are left unchanged in the database
+ * while null-valued fields clear the column.
+ *
+ *
The non-nullable queue properties ({@code priorityEnabled}, {@code partitionQueue}, {@code
+ * pollingInterval}) use {@link Optional} — {@link Optional#empty()} means use the default on
+ * creation or leave unchanged on update; a present value sets the column.
+ */
+public record QueueOptions(
+ Field concurrency,
+ Field workerConcurrency,
+ Field rateLimitMax,
+ Field rateLimitPeriod,
+ Optional priorityEnabled,
+ Optional partitionQueue,
+ Optional pollingInterval) {
+
+ private static final QueueOptions EMPTY =
+ new QueueOptions(
+ Field.absent(),
+ Field.absent(),
+ Field.absent(),
+ Field.absent(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty());
+
+ public static QueueOptions empty() {
+ return EMPTY;
+ }
+
+ public boolean isEmpty() {
+ return !concurrency.isPresent()
+ && !workerConcurrency.isPresent()
+ && !rateLimitMax.isPresent()
+ && !rateLimitPeriod.isPresent()
+ && priorityEnabled.isEmpty()
+ && partitionQueue.isEmpty()
+ && pollingInterval.isEmpty();
+ }
+
+ // ── Static factories ──────────────────────────────────────────────────────
+
+ public static QueueOptions setConcurrency(@Nullable Integer value) {
+ return EMPTY.withConcurrency(Field.of(value));
+ }
+
+ public static QueueOptions setWorkerConcurrency(@Nullable Integer value) {
+ return EMPTY.withWorkerConcurrency(Field.of(value));
+ }
+
+ public static QueueOptions setRateLimit(@Nullable Integer max, @Nullable Duration period) {
+ return EMPTY.withRateLimitMax(Field.of(max)).withRateLimitPeriod(Field.of(period));
+ }
+
+ public static QueueOptions setRateLimit(int limit, long period, TimeUnit unit) {
+ return setRateLimit(limit, Duration.of(period, unit.toChronoUnit()));
+ }
+
+ public static QueueOptions setPriorityEnabled(boolean value) {
+ return EMPTY.withPriorityEnabled(Optional.of(value));
+ }
+
+ public static QueueOptions setPartitionQueue(boolean value) {
+ return EMPTY.withPartitionQueue(Optional.of(value));
+ }
+
+ public static QueueOptions setPollingInterval(Duration value) {
+ return EMPTY.withPollingInterval(Optional.of(value));
+ }
+
+ // ── Builders for chaining ─────────────────────────────────────────────────
+
+ public QueueOptions withConcurrency(Field concurrency) {
+ return new QueueOptions(
+ concurrency,
+ workerConcurrency,
+ rateLimitMax,
+ rateLimitPeriod,
+ priorityEnabled,
+ partitionQueue,
+ pollingInterval);
+ }
+
+ public QueueOptions withWorkerConcurrency(Field workerConcurrency) {
+ return new QueueOptions(
+ concurrency,
+ workerConcurrency,
+ rateLimitMax,
+ rateLimitPeriod,
+ priorityEnabled,
+ partitionQueue,
+ pollingInterval);
+ }
+
+ public QueueOptions withRateLimitMax(Field rateLimitMax) {
+ return new QueueOptions(
+ concurrency,
+ workerConcurrency,
+ rateLimitMax,
+ rateLimitPeriod,
+ priorityEnabled,
+ partitionQueue,
+ pollingInterval);
+ }
+
+ public QueueOptions withRateLimitPeriod(Field rateLimitPeriod) {
+ return new QueueOptions(
+ concurrency,
+ workerConcurrency,
+ rateLimitMax,
+ rateLimitPeriod,
+ priorityEnabled,
+ partitionQueue,
+ pollingInterval);
+ }
+
+ public QueueOptions withPriorityEnabled(Optional priorityEnabled) {
+ return new QueueOptions(
+ concurrency,
+ workerConcurrency,
+ rateLimitMax,
+ rateLimitPeriod,
+ priorityEnabled,
+ partitionQueue,
+ pollingInterval);
+ }
+
+ public QueueOptions withPartitionQueue(Optional partitionQueue) {
+ return new QueueOptions(
+ concurrency,
+ workerConcurrency,
+ rateLimitMax,
+ rateLimitPeriod,
+ priorityEnabled,
+ partitionQueue,
+ pollingInterval);
+ }
+
+ public QueueOptions withPollingInterval(Optional pollingInterval) {
+ return new QueueOptions(
+ concurrency,
+ workerConcurrency,
+ rateLimitMax,
+ rateLimitPeriod,
+ priorityEnabled,
+ partitionQueue,
+ pollingInterval);
+ }
+
+ // ── Convenience chaining methods ──────────────────────────────────────────
+
+ public QueueOptions andConcurrency(@Nullable Integer value) {
+ return withConcurrency(Field.of(value));
+ }
+
+ public QueueOptions andWorkerConcurrency(@Nullable Integer value) {
+ return withWorkerConcurrency(Field.of(value));
+ }
+
+ public QueueOptions andRateLimit(@Nullable Integer max, @Nullable Duration period) {
+ return withRateLimitMax(Field.of(max)).withRateLimitPeriod(Field.of(period));
+ }
+
+ public QueueOptions andRateLimit(int max, long period, TimeUnit unit) {
+ return andRateLimit(max, Duration.of(period, unit.toChronoUnit()));
+ }
+
+ public QueueOptions andPriorityEnabled(boolean value) {
+ return withPriorityEnabled(Optional.of(value));
+ }
+
+ public QueueOptions andPartitionQueue(boolean value) {
+ return withPartitionQueue(Optional.of(value));
+ }
+
+ public QueueOptions andPollingInterval(Duration value) {
+ return withPollingInterval(Optional.of(value));
+ }
+}
diff --git a/transact/src/test/java/dev/dbos/transact/admin/AdminServerTest.java b/transact/src/test/java/dev/dbos/transact/admin/AdminServerTest.java
index bdfcf0f4..9c8d916d 100644
--- a/transact/src/test/java/dev/dbos/transact/admin/AdminServerTest.java
+++ b/transact/src/test/java/dev/dbos/transact/admin/AdminServerTest.java
@@ -32,6 +32,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import io.restassured.response.Response;
import org.junit.jupiter.api.BeforeEach;
@@ -106,7 +107,7 @@ public void ensurePostJsonNotJson() throws IOException {
@Test
public void exceptionCatching500() throws IOException {
var exception = new RuntimeException("test-exception");
- when(mockExec.getQueues()).thenThrow(exception);
+ when(mockExec.getStaticQueues()).thenThrow(exception);
try (var server = new AdminServer(port, mockExec, mockDB)) {
server.start();
@@ -197,9 +198,9 @@ public void queueMetadata() throws IOException {
.withConcurrency(10)
.withWorkerConcurrency(5)
.withPriorityEnabled(true)
- .withRateLimit(2, 4.0);
+ .withRateLimit(2, 4, TimeUnit.SECONDS);
- when(mockExec.getQueues()).thenReturn(List.of(queue1, queue2));
+ when(mockExec.getStaticQueues()).thenReturn(List.of(queue1, queue2));
try (var server = new AdminServer(port, mockExec, mockDB)) {
server.start();
diff --git a/transact/src/test/java/dev/dbos/transact/client/ClientQueueTest.java b/transact/src/test/java/dev/dbos/transact/client/ClientQueueTest.java
new file mode 100644
index 00000000..2487d32a
--- /dev/null
+++ b/transact/src/test/java/dev/dbos/transact/client/ClientQueueTest.java
@@ -0,0 +1,114 @@
+package dev.dbos.transact.client;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import dev.dbos.transact.utils.PgContainer;
+import dev.dbos.transact.workflow.Queue;
+import dev.dbos.transact.workflow.QueueConflictResolution;
+import dev.dbos.transact.workflow.QueueOptions;
+
+import org.junit.jupiter.api.AutoClose;
+import org.junit.jupiter.api.Test;
+
+public class ClientQueueTest {
+
+ @AutoClose final PgContainer pgContainer = new PgContainer();
+
+ @Test
+ public void testClientRegisterAndFindQueue() {
+ try (var client = pgContainer.dbosClient()) {
+ client.registerQueue("cq-find", QueueOptions.setConcurrency(7));
+
+ var q = client.findQueue("cq-find").orElseThrow();
+ assertEquals("cq-find", q.name());
+ assertEquals(7, q.concurrency());
+
+ assertTrue(client.findQueue("cq-does-not-exist").isEmpty());
+ }
+ }
+
+ @Test
+ public void testClientListQueues() {
+ try (var client = pgContainer.dbosClient()) {
+ client.registerQueue("cq-list-1", QueueOptions.setConcurrency(1));
+ client.registerQueue("cq-list-2", QueueOptions.setConcurrency(2));
+ client.registerQueue("cq-list-3", QueueOptions.empty());
+
+ var names = client.listQueues().stream().map(Queue::name).toList();
+ assertTrue(names.contains("cq-list-1"));
+ assertTrue(names.contains("cq-list-2"));
+ assertTrue(names.contains("cq-list-3"));
+ }
+ }
+
+ @Test
+ public void testClientDeleteQueue() {
+ try (var client = pgContainer.dbosClient()) {
+ client.registerQueue("cq-del", QueueOptions.empty());
+ assertTrue(client.findQueue("cq-del").isPresent());
+
+ assertTrue(client.deleteQueue("cq-del"));
+ assertTrue(client.findQueue("cq-del").isEmpty());
+
+ assertFalse(client.deleteQueue("cq-never-existed"));
+ }
+ }
+
+ @Test
+ public void testClientUpdateQueue() {
+ try (var client = pgContainer.dbosClient()) {
+ client.registerQueue("cq-update", QueueOptions.setConcurrency(5));
+ assertEquals(5, client.findQueue("cq-update").orElseThrow().concurrency());
+
+ client.updateQueue("cq-update", QueueOptions.setConcurrency(10));
+ assertEquals(10, client.findQueue("cq-update").orElseThrow().concurrency());
+ }
+ }
+
+ @Test
+ public void testClientNeverUpdate() {
+ try (var client = pgContainer.dbosClient()) {
+ client.registerQueue("cq-never", QueueOptions.setConcurrency(5));
+ client.registerQueue(
+ "cq-never", QueueOptions.setConcurrency(99), QueueConflictResolution.NEVER_UPDATE);
+
+ assertEquals(5, client.findQueue("cq-never").orElseThrow().concurrency());
+ }
+ }
+
+ @Test
+ public void testClientAlwaysUpdate() {
+ try (var client = pgContainer.dbosClient()) {
+ client.registerQueue("cq-always", QueueOptions.setConcurrency(5));
+ client.registerQueue(
+ "cq-always", QueueOptions.setConcurrency(99), QueueConflictResolution.ALWAYS_UPDATE);
+
+ assertEquals(99, client.findQueue("cq-always").orElseThrow().concurrency());
+ }
+ }
+
+ @Test
+ public void testClientRejectsUpdateIfLatestVersion() {
+ try (var client = pgContainer.dbosClient()) {
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ client.registerQueue(
+ "cq-version",
+ QueueOptions.empty(),
+ QueueConflictResolution.UPDATE_IF_LATEST_VERSION));
+ }
+ }
+
+ @Test
+ public void testClientRejectsInternalQueueName() {
+ try (var client = pgContainer.dbosClient()) {
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> client.registerQueue("_dbos_internal_queue", QueueOptions.empty()));
+ }
+ }
+}
diff --git a/transact/src/test/java/dev/dbos/transact/conductor/ConductorTest.java b/transact/src/test/java/dev/dbos/transact/conductor/ConductorTest.java
index 937d7547..3c5dcf17 100644
--- a/transact/src/test/java/dev/dbos/transact/conductor/ConductorTest.java
+++ b/transact/src/test/java/dev/dbos/transact/conductor/ConductorTest.java
@@ -3265,4 +3265,136 @@ public void canGetWorkflowStreamsThrows() throws Exception {
assertEquals(0, json.get("streams").size());
}
}
+
+ @RetryingTest(3)
+ public void canListQueues() throws Exception {
+ MessageListener listener = new MessageListener();
+ testServer.setListener(listener);
+
+ dev.dbos.transact.workflow.Queue q1 =
+ new dev.dbos.transact.workflow.Queue("queue-1")
+ .withConcurrency(5)
+ .withWorkerConcurrency(2)
+ .withRateLimit(10, Duration.ofSeconds(60))
+ .withPriorityEnabled(true)
+ .withPartitioningEnabled(true)
+ .withPollingInterval(Duration.ofMillis(500));
+ dev.dbos.transact.workflow.Queue q2 = new dev.dbos.transact.workflow.Queue("queue-2");
+ when(mockDB.listQueues()).thenReturn(List.of(q1, q2));
+
+ try (Conductor conductor = builder.build()) {
+ conductor.start();
+ assertTrue(listener.openLatch.await(5, TimeUnit.SECONDS), "open latch timed out");
+
+ listener.send(MessageType.LIST_QUEUES, "req-list-queues", Map.of());
+ assertTrue(listener.messageLatch.await(1, TimeUnit.SECONDS), "message latch timed out");
+
+ verify(mockDB).listQueues();
+
+ JsonNode json = mapper.readTree(listener.message);
+ assertEquals("list_queues", json.get("type").asText());
+ assertEquals("req-list-queues", json.get("request_id").asText());
+ assertNull(json.get("error_message"));
+
+ JsonNode output = json.get("output");
+ assertNotNull(output);
+ assertTrue(output.isArray());
+ assertEquals(2, output.size());
+
+ JsonNode first = output.get(0);
+ assertEquals("queue-1", first.get("name").asText());
+ assertEquals(5, first.get("concurrency").asInt());
+ assertEquals(2, first.get("worker_concurrency").asInt());
+ assertEquals(10, first.get("rate_limit_max").asInt());
+ assertEquals(60.0, first.get("rate_limit_period_sec").asDouble(), 0.001);
+ assertTrue(first.get("priority_enabled").asBoolean());
+ assertTrue(first.get("partition_queue").asBoolean());
+ assertEquals(0.5, first.get("polling_interval_sec").asDouble(), 0.001);
+
+ JsonNode second = output.get(1);
+ assertEquals("queue-2", second.get("name").asText());
+ assertTrue(second.get("concurrency").isNull());
+ assertTrue(second.get("worker_concurrency").isNull());
+ assertTrue(second.get("rate_limit_max").isNull());
+ assertTrue(second.get("rate_limit_period_sec").isNull());
+ assertFalse(second.get("priority_enabled").asBoolean());
+ assertFalse(second.get("partition_queue").asBoolean());
+ assertEquals(1.0, second.get("polling_interval_sec").asDouble(), 0.001);
+ }
+ }
+
+ @RetryingTest(3)
+ public void canListQueuesThrows() throws Exception {
+ MessageListener listener = new MessageListener();
+ testServer.setListener(listener);
+
+ String errorMessage = "canListQueuesThrows error";
+ doThrow(new RuntimeException(errorMessage)).when(mockDB).listQueues();
+
+ try (Conductor conductor = builder.build()) {
+ conductor.start();
+ assertTrue(listener.openLatch.await(5, TimeUnit.SECONDS), "open latch timed out");
+
+ listener.send(MessageType.LIST_QUEUES, "req-list-queues-err", Map.of());
+ assertTrue(listener.messageLatch.await(1, TimeUnit.SECONDS), "message latch timed out");
+
+ JsonNode json = mapper.readTree(listener.message);
+ assertEquals("list_queues", json.get("type").asText());
+ assertEquals(errorMessage, json.get("error_message").asText());
+ assertEquals(0, json.get("output").size());
+ }
+ }
+
+ @RetryingTest(3)
+ public void canGetQueue() throws Exception {
+ MessageListener listener = new MessageListener();
+ testServer.setListener(listener);
+
+ dev.dbos.transact.workflow.Queue queue =
+ new dev.dbos.transact.workflow.Queue("my-queue").withConcurrency(3);
+ when(mockDB.findQueue("my-queue")).thenReturn(Optional.of(queue));
+
+ try (Conductor conductor = builder.build()) {
+ conductor.start();
+ assertTrue(listener.openLatch.await(5, TimeUnit.SECONDS), "open latch timed out");
+
+ listener.send(MessageType.GET_QUEUE, "req-get-queue", Map.of("name", "my-queue"));
+ assertTrue(listener.messageLatch.await(1, TimeUnit.SECONDS), "message latch timed out");
+
+ verify(mockDB).findQueue("my-queue");
+
+ JsonNode json = mapper.readTree(listener.message);
+ assertEquals("get_queue", json.get("type").asText());
+ assertEquals("req-get-queue", json.get("request_id").asText());
+ assertNull(json.get("error_message"));
+
+ JsonNode output = json.get("output");
+ assertNotNull(output);
+ assertEquals("my-queue", output.get("name").asText());
+ assertEquals(3, output.get("concurrency").asInt());
+ assertTrue(output.get("worker_concurrency").isNull());
+ }
+ }
+
+ @RetryingTest(3)
+ public void canGetQueueNotFound() throws Exception {
+ MessageListener listener = new MessageListener();
+ testServer.setListener(listener);
+
+ when(mockDB.findQueue("nonexistent")).thenReturn(Optional.empty());
+
+ try (Conductor conductor = builder.build()) {
+ conductor.start();
+ assertTrue(listener.openLatch.await(5, TimeUnit.SECONDS), "open latch timed out");
+
+ listener.send(MessageType.GET_QUEUE, "req-get-queue-404", Map.of("name", "nonexistent"));
+ assertTrue(listener.messageLatch.await(1, TimeUnit.SECONDS), "message latch timed out");
+
+ JsonNode json = mapper.readTree(listener.message);
+ assertEquals("get_queue", json.get("type").asText());
+ assertEquals("req-get-queue-404", json.get("request_id").asText());
+ assertNull(json.get("error_message"));
+ assertTrue(!json.has("output") || json.get("output").isNull());
+ }
+ }
}
diff --git a/transact/src/test/java/dev/dbos/transact/database/SystemDatabaseTest.java b/transact/src/test/java/dev/dbos/transact/database/SystemDatabaseTest.java
index 73396c7c..0a737276 100644
--- a/transact/src/test/java/dev/dbos/transact/database/SystemDatabaseTest.java
+++ b/transact/src/test/java/dev/dbos/transact/database/SystemDatabaseTest.java
@@ -23,6 +23,8 @@
import dev.dbos.transact.workflow.ExportedWorkflow;
import dev.dbos.transact.workflow.ForkOptions;
import dev.dbos.transact.workflow.GetWorkflowAggregatesInput;
+import dev.dbos.transact.workflow.Queue;
+import dev.dbos.transact.workflow.QueueOptions;
import dev.dbos.transact.workflow.ScheduleStatus;
import dev.dbos.transact.workflow.VersionInfo;
import dev.dbos.transact.workflow.WorkflowDelay;
@@ -1566,4 +1568,165 @@ public void testGetAllNotificationsEmpty() throws Exception {
var notifications = sysdb.getAllNotifications(wfId);
assertTrue(notifications.isEmpty());
}
+
+ // --- Queue CRUD tests ---
+
+ @Test
+ public void testUpsertQueueInsert() {
+ var options =
+ QueueOptions.setConcurrency(5)
+ .andWorkerConcurrency(2)
+ .andPriorityEnabled(true)
+ .andRateLimit(10, 60, java.util.concurrent.TimeUnit.SECONDS);
+
+ boolean inserted = sysdb.upsertQueue("q-insert", options, true);
+ assertTrue(inserted, "upsertQueue should return true when the row is new");
+
+ var fetched = sysdb.findQueue("q-insert");
+ assertTrue(fetched.isPresent());
+ var q = fetched.get();
+ assertEquals("q-insert", q.name());
+ assertEquals(5, q.concurrency());
+ assertEquals(2, q.workerConcurrency());
+ assertTrue(q.priorityEnabled());
+ assertNotNull(q.rateLimit());
+ assertEquals(10, q.rateLimit().limit());
+ assertEquals(Duration.ofSeconds(60), q.rateLimit().period());
+ }
+
+ @Test
+ public void testUpsertQueueOptionsExisting() {
+ sysdb.upsertQueue("q-update", QueueOptions.setConcurrency(3), true);
+
+ boolean inserted =
+ sysdb.upsertQueue("q-update", QueueOptions.setConcurrency(7).andWorkerConcurrency(4), true);
+ assertFalse(inserted, "upsertQueue should return false when the row already existed");
+
+ var fetched = sysdb.findQueue("q-update").orElseThrow();
+ assertEquals(7, fetched.concurrency());
+ assertEquals(4, fetched.workerConcurrency());
+ }
+
+ @Test
+ public void testUpsertQueueNoUpdateExisting() {
+ sysdb.upsertQueue("q-no-update", QueueOptions.setConcurrency(3), true);
+
+ boolean inserted = sysdb.upsertQueue("q-no-update", QueueOptions.setConcurrency(99), false);
+ assertFalse(inserted, "upsertQueue should return false when the row already existed");
+
+ var fetched = sysdb.findQueue("q-no-update").orElseThrow();
+ assertEquals(
+ 3, fetched.concurrency(), "concurrency should be unchanged when updateExisting=false");
+ }
+
+ @Test
+ public void testGetQueueFromDBMissing() {
+ var result = sysdb.findQueue("does-not-exist");
+ assertTrue(result.isEmpty());
+ }
+
+ @Test
+ public void testListQueuesFromDB() {
+ sysdb.upsertQueue("q-list-a", QueueOptions.setConcurrency(1), true);
+ sysdb.upsertQueue("q-list-b", QueueOptions.setConcurrency(2), true);
+ sysdb.upsertQueue("q-list-c", QueueOptions.empty(), true);
+
+ var queues = sysdb.listQueues();
+ var names = queues.stream().map(Queue::name).toList();
+ assertTrue(names.contains("q-list-a"));
+ assertTrue(names.contains("q-list-b"));
+ assertTrue(names.contains("q-list-c"));
+ }
+
+ @Test
+ public void testDeleteQueue() {
+ sysdb.upsertQueue("q-delete", QueueOptions.setConcurrency(1), true);
+ assertTrue(sysdb.findQueue("q-delete").isPresent());
+
+ boolean deleted = sysdb.deleteQueue("q-delete");
+ assertTrue(deleted);
+ assertTrue(sysdb.findQueue("q-delete").isEmpty());
+ }
+
+ @Test
+ public void testDeleteQueueMissing() {
+ assertFalse(sysdb.deleteQueue("q-never-existed"));
+ }
+
+ @Test
+ public void testUpdateQueuePartialConcurrency() {
+ sysdb.upsertQueue(
+ "q-partial",
+ QueueOptions.setConcurrency(5)
+ .andPriorityEnabled(true)
+ .andRateLimit(10, 60, java.util.concurrent.TimeUnit.SECONDS),
+ true);
+
+ sysdb.updateQueue("q-partial", QueueOptions.setConcurrency(99));
+
+ var q = sysdb.findQueue("q-partial").orElseThrow();
+ assertEquals(99, q.concurrency(), "concurrency should be updated");
+ assertTrue(q.priorityEnabled(), "priorityEnabled should be unchanged");
+ assertNotNull(q.rateLimit(), "rateLimit should be unchanged");
+ assertEquals(10, q.rateLimit().limit());
+ }
+
+ @Test
+ public void testUpdateQueueClearConcurrency() {
+ sysdb.upsertQueue("q-clear-conc", QueueOptions.setConcurrency(5), true);
+
+ sysdb.updateQueue("q-clear-conc", QueueOptions.setConcurrency(null));
+
+ var q = sysdb.findQueue("q-clear-conc").orElseThrow();
+ assertNull(q.concurrency(), "concurrency should be cleared to null");
+ }
+
+ @Test
+ public void testUpdateQueueClearRateLimit() {
+ sysdb.upsertQueue(
+ "q-clear-rate",
+ QueueOptions.setRateLimit(5, 30, java.util.concurrent.TimeUnit.SECONDS),
+ true);
+
+ sysdb.updateQueue("q-clear-rate", QueueOptions.setRateLimit(null, null));
+
+ var q = sysdb.findQueue("q-clear-rate").orElseThrow();
+ assertNull(q.rateLimit(), "rateLimit should be cleared to null");
+ }
+
+ @Test
+ public void testUpdateQueueEmpty() {
+ sysdb.upsertQueue("q-empty-update", QueueOptions.setConcurrency(5), true);
+
+ // Empty update should be a no-op (no exception, no change)
+ var emptyUpdate = QueueOptions.empty();
+ sysdb.updateQueue("q-empty-update", emptyUpdate);
+
+ var q = sysdb.findQueue("q-empty-update").orElseThrow();
+ assertEquals(5, q.concurrency());
+ }
+
+ @Test
+ public void testUpsertQueueRoundTrip() {
+ sysdb.upsertQueue(
+ "q-roundtrip",
+ QueueOptions.setConcurrency(8)
+ .andWorkerConcurrency(4)
+ .andPriorityEnabled(true)
+ .andPartitionQueue(true)
+ .andRateLimit(20, 30, java.util.concurrent.TimeUnit.SECONDS)
+ .andPollingInterval(Duration.ofSeconds(5)),
+ true);
+ var fetched = sysdb.findQueue("q-roundtrip").orElseThrow();
+
+ assertEquals("q-roundtrip", fetched.name());
+ assertEquals(8, fetched.concurrency());
+ assertEquals(4, fetched.workerConcurrency());
+ assertTrue(fetched.priorityEnabled());
+ assertTrue(fetched.partitioningEnabled());
+ assertNotNull(fetched.rateLimit());
+ assertEquals(20, fetched.rateLimit().limit());
+ assertEquals(Duration.ofSeconds(30), fetched.rateLimit().period());
+ assertEquals(Duration.ofSeconds(5), fetched.pollingInterval());
+ }
}
diff --git a/transact/src/test/java/dev/dbos/transact/queue/DynamicQueuesTest.java b/transact/src/test/java/dev/dbos/transact/queue/DynamicQueuesTest.java
new file mode 100644
index 00000000..e8be3e6c
--- /dev/null
+++ b/transact/src/test/java/dev/dbos/transact/queue/DynamicQueuesTest.java
@@ -0,0 +1,1003 @@
+package dev.dbos.transact.queue;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import dev.dbos.transact.Constants;
+import dev.dbos.transact.DBOS;
+import dev.dbos.transact.DBOSTestAccess;
+import dev.dbos.transact.StartWorkflowOptions;
+import dev.dbos.transact.config.DBOSConfig;
+import dev.dbos.transact.json.SerializationUtil;
+import dev.dbos.transact.utils.DBUtils;
+import dev.dbos.transact.utils.PgContainer;
+import dev.dbos.transact.utils.WorkflowStatusInternalBuilder;
+import dev.dbos.transact.workflow.ListWorkflowsInput;
+import dev.dbos.transact.workflow.Queue;
+import dev.dbos.transact.workflow.QueueConflictResolution;
+import dev.dbos.transact.workflow.QueueOptions;
+import dev.dbos.transact.workflow.WorkflowHandle;
+import dev.dbos.transact.workflow.WorkflowState;
+import dev.dbos.transact.workflow.WorkflowStatus;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.BooleanSupplier;
+
+import com.zaxxer.hikari.HikariDataSource;
+import org.junit.jupiter.api.AutoClose;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DynamicQueuesTest {
+
+ private static final Logger logger = LoggerFactory.getLogger(DynamicQueuesTest.class);
+
+ @AutoClose final PgContainer pgContainer = new PgContainer();
+
+ DBOSConfig dbosConfig;
+ @AutoClose DBOS dbos;
+ @AutoClose HikariDataSource dataSource;
+
+ @BeforeEach
+ void beforeEach() {
+ dbosConfig = pgContainer.dbosConfig();
+ dbos = new DBOS(dbosConfig);
+ dataSource = pgContainer.dataSource();
+ }
+
+ @Test
+ public void testDynamicQueueWorkflowExecution() throws Exception {
+ ServiceQ serviceQ = dbos.registerProxy(ServiceQ.class, new ServiceQImpl());
+ dbos.launch();
+
+ var qs = DBOSTestAccess.getQueueService(dbos);
+ qs.setSpeedupForTest();
+
+ // Register a dynamic queue after launch — this writes to DB.
+ dbos.registerQueue("dynQueue", QueueOptions.empty());
+
+ // The supervisor polls every 1s; wait for it to discover and start a listener.
+ var handle =
+ dbos.startWorkflow(
+ () -> serviceQ.simpleQWorkflow("hello"),
+ new StartWorkflowOptions().withQueue("dynQueue"));
+
+ assertEquals("hellohello", handle.getResult());
+ assertEquals(WorkflowState.SUCCESS, handle.getStatus().status());
+ }
+
+ @Test
+ public void testDynamicQueueConcurrency() throws Exception {
+ ServiceQ serviceQ = dbos.registerProxy(ServiceQ.class, new ServiceQImpl());
+ dbos.launch();
+
+ var qs = DBOSTestAccess.getQueueService(dbos);
+ qs.setSpeedupForTest();
+
+ dbos.registerQueue("concQ", QueueOptions.setConcurrency(1).andWorkerConcurrency(1));
+
+ for (int i = 0; i < 3; i++) {
+ String id = "dynwf" + i;
+ String input = "v" + i;
+ dbos.startWorkflow(
+ () -> serviceQ.simpleQWorkflow(input), new StartWorkflowOptions(id).withQueue("concQ"));
+ }
+
+ for (int i = 0; i < 3; i++) {
+ var handle = dbos.retrieveWorkflow("dynwf" + i);
+ assertEquals("v" + i + "v" + i, handle.getResult());
+ assertEquals(WorkflowState.SUCCESS, handle.getStatus().status());
+ }
+ }
+
+ @Test
+ public void testListQueues() throws Exception {
+ dbos.launch();
+
+ dbos.registerQueue("q-list-1", QueueOptions.setConcurrency(1));
+ dbos.registerQueue("q-list-2", QueueOptions.setConcurrency(2));
+ dbos.registerQueue("q-list-3", QueueOptions.empty());
+
+ var queues = dbos.listQueues();
+ var names = queues.stream().map(Queue::name).toList();
+ assertTrue(names.contains("q-list-1"));
+ assertTrue(names.contains("q-list-2"));
+ assertTrue(names.contains("q-list-3"));
+ assertEquals(3, names.size());
+ }
+
+ @Test
+ public void testDeleteQueue() throws Exception {
+ dbos.launch();
+
+ dbos.registerQueue("q-del", QueueOptions.setConcurrency(1));
+ assertTrue(dbos.listQueues().stream().anyMatch(q -> q.name().equals("q-del")));
+
+ boolean deleted = dbos.deleteQueue("q-del");
+ assertTrue(deleted);
+ assertFalse(dbos.listQueues().stream().anyMatch(q -> q.name().equals("q-del")));
+
+ // deleting a non-existent queue returns false
+ assertFalse(dbos.deleteQueue("q-never-existed"));
+ }
+
+ @Test
+ public void testUpdateQueue() throws Exception {
+ dbos.launch();
+
+ dbos.registerQueue("q-update", QueueOptions.setConcurrency(5));
+
+ var before =
+ dbos.listQueues().stream()
+ .filter(x -> x.name().equals("q-update"))
+ .findFirst()
+ .orElseThrow();
+ assertEquals(5, before.concurrency());
+
+ dbos.updateQueue("q-update", QueueOptions.setConcurrency(10));
+
+ var after =
+ dbos.listQueues().stream()
+ .filter(x -> x.name().equals("q-update"))
+ .findFirst()
+ .orElseThrow();
+ assertEquals(10, after.concurrency());
+ }
+
+ @Test
+ public void testRegisterQueueNeverUpdate() throws Exception {
+ dbos.launch();
+
+ dbos.registerQueue("q-conflict", QueueOptions.setConcurrency(5));
+
+ // NEVER_UPDATE: second call should not overwrite
+ dbos.registerQueue(
+ "q-conflict", QueueOptions.setConcurrency(99), QueueConflictResolution.NEVER_UPDATE);
+
+ var q =
+ dbos.listQueues().stream()
+ .filter(x -> x.name().equals("q-conflict"))
+ .findFirst()
+ .orElseThrow();
+ assertEquals(5, q.concurrency());
+ }
+
+ @Test
+ public void testRegisterQueueAlwaysUpdate() throws Exception {
+ dbos.launch();
+
+ dbos.registerQueue("q-always", QueueOptions.setConcurrency(5));
+
+ // ALWAYS_UPDATE: second call should overwrite
+ dbos.registerQueue(
+ "q-always", QueueOptions.setConcurrency(99), QueueConflictResolution.ALWAYS_UPDATE);
+
+ var q =
+ dbos.listQueues().stream()
+ .filter(x -> x.name().equals("q-always"))
+ .findFirst()
+ .orElseThrow();
+ assertEquals(99, q.concurrency());
+ }
+
+ @Test
+ public void testDynamicQueuePollingInterval() throws Exception {
+ dbos.launch();
+
+ var interval = Duration.ofSeconds(3);
+ dbos.registerQueue("q-poll", QueueOptions.setPollingInterval(interval));
+
+ var q =
+ dbos.listQueues().stream().filter(x -> x.name().equals("q-poll")).findFirst().orElseThrow();
+ assertEquals(interval, q.pollingInterval());
+ }
+
+ @Test
+ public void testRegisterInternalQueueThrows() throws Exception {
+ dbos.launch();
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> dbos.registerQueue("_dbos_internal_queue", QueueOptions.empty()));
+ }
+
+ @Test
+ public void testDeleteAndRecreateQueue() throws Exception {
+ ServiceQ serviceQ = dbos.registerProxy(ServiceQ.class, new ServiceQImpl());
+ dbos.launch();
+
+ var qs = DBOSTestAccess.getQueueService(dbos);
+ qs.setSpeedupForTest();
+
+ dbos.registerQueue("q-lifecycle", QueueOptions.setConcurrency(5));
+
+ var h1 =
+ dbos.startWorkflow(
+ () -> serviceQ.simpleQWorkflow("first"),
+ new StartWorkflowOptions("lc-wf1").withQueue("q-lifecycle"));
+ assertEquals("firstfirst", h1.getResult());
+
+ dbos.deleteQueue("q-lifecycle");
+ assertFalse(dbos.listQueues().stream().anyMatch(x -> x.name().equals("q-lifecycle")));
+
+ // Wait for the old listener to detect the deletion and remove itself from the
+ // active-listener set. Without this wait the supervisor may not start a fresh
+ // listener for the recreated queue (dbListeningQueues still contains the name).
+ Thread.sleep(500);
+
+ // Recreate with different config.
+ dbos.registerQueue("q-lifecycle", QueueOptions.setConcurrency(2));
+ var recreated =
+ dbos.listQueues().stream()
+ .filter(x -> x.name().equals("q-lifecycle"))
+ .findFirst()
+ .orElseThrow();
+ assertEquals(2, recreated.concurrency());
+
+ var h2 =
+ dbos.startWorkflow(
+ () -> serviceQ.simpleQWorkflow("second"),
+ new StartWorkflowOptions("lc-wf2").withQueue("q-lifecycle"));
+ assertEquals("secondsecond", h2.getResult());
+ }
+
+ @Test
+ public void testStaticAndDynamicQueueSameName() throws Exception {
+ // Static queue registered pre-launch.
+ var staticQ = new Queue("q-shared").withConcurrency(3);
+ dbos.registerQueue(staticQ);
+ ServiceQ serviceQ = dbos.registerProxy(ServiceQ.class, new ServiceQImpl());
+ dbos.launch();
+
+ var qs = DBOSTestAccess.getQueueService(dbos);
+ qs.setSpeedupForTest();
+
+ // Register same name as a dynamic queue — supervisor should ignore the DB entry.
+ dbos.registerQueue("q-shared", QueueOptions.setConcurrency(99));
+
+ // Workflow still executes — static listener handles it.
+ var handle =
+ dbos.startWorkflow(
+ () -> serviceQ.simpleQWorkflow("shared"),
+ new StartWorkflowOptions().withQueue("q-shared"));
+ assertEquals("sharedshared", handle.getResult());
+ assertEquals(WorkflowState.SUCCESS, handle.getStatus().status());
+ }
+
+ @Test
+ public void testDynamicConcurrencyTakesEffect() throws Exception {
+ ConcurrencyTestServiceImpl impl = new ConcurrencyTestServiceImpl();
+ ConcurrencyTestService service = dbos.registerProxy(ConcurrencyTestService.class, impl);
+ dbos.launch();
+
+ var qs = DBOSTestAccess.getQueueService(dbos);
+ qs.setSpeedupForTest();
+
+ // Start with concurrency=1 so only one workflow dequeues at a time.
+ dbos.registerQueue("dyn-update-q", QueueOptions.setConcurrency(1));
+
+ var h1 =
+ dbos.startWorkflow(
+ () -> service.blockedWorkflow(0),
+ new StartWorkflowOptions("dyn-wf1").withQueue("dyn-update-q"));
+ var h2 =
+ dbos.startWorkflow(
+ () -> service.blockedWorkflow(1),
+ new StartWorkflowOptions("dyn-wf2").withQueue("dyn-update-q"));
+ var h3 =
+ dbos.startWorkflow(
+ () -> service.blockedWorkflow(2),
+ new StartWorkflowOptions("dyn-wf3").withQueue("dyn-update-q"));
+
+ // Wait for exactly one workflow to be dequeued and start running.
+ impl.wfSemaphore.acquire(1);
+
+ // With concurrency=1 the other two should still be waiting.
+ Thread.sleep(200);
+ assertEquals(WorkflowState.ENQUEUED, h2.getStatus().status());
+ assertEquals(WorkflowState.ENQUEUED, h3.getStatus().status());
+
+ // Bump concurrency. The runner reloads queue settings on its next poll and
+ // should immediately dequeue the remaining two workflows.
+ dbos.updateQueue("dyn-update-q", QueueOptions.setConcurrency(3));
+ impl.wfSemaphore.acquire(2);
+
+ // Release all blocked workflows and verify they complete successfully.
+ impl.latch.countDown();
+ assertEquals(0, h1.getResult());
+ assertEquals(1, h2.getResult());
+ assertEquals(2, h3.getResult());
+ }
+
+ @Test
+ public void testDynamicQueueMapUpdatedOnRegister() throws Exception {
+ dbos.launch();
+ var qs = DBOSTestAccess.getQueueService(dbos);
+
+ assertFalse(qs.findDynamicQueue("q-map-reg").isPresent());
+
+ dbos.registerQueue("q-map-reg", QueueOptions.setConcurrency(5));
+ awaitCondition(() -> qs.findDynamicQueue("q-map-reg").isPresent());
+
+ assertEquals(5, qs.findDynamicQueue("q-map-reg").get().concurrency());
+ }
+
+ @Test
+ public void testDynamicQueueMapUpdatedOnUpdate() throws Exception {
+ dbos.launch();
+ var qs = DBOSTestAccess.getQueueService(dbos);
+
+ dbos.registerQueue("q-map-upd", QueueOptions.setConcurrency(5));
+ awaitCondition(() -> qs.findDynamicQueue("q-map-upd").isPresent());
+
+ dbos.updateQueue("q-map-upd", QueueOptions.setConcurrency(10));
+ awaitCondition(
+ () ->
+ qs.findDynamicQueue("q-map-upd")
+ .filter(q -> Integer.valueOf(10).equals(q.concurrency()))
+ .isPresent());
+
+ assertEquals(10, qs.findDynamicQueue("q-map-upd").get().concurrency());
+ }
+
+ @Test
+ public void testDynamicQueueMapUpdatedOnDelete() throws Exception {
+ dbos.launch();
+ var qs = DBOSTestAccess.getQueueService(dbos);
+
+ dbos.registerQueue("q-map-del", QueueOptions.empty());
+ awaitCondition(() -> qs.findDynamicQueue("q-map-del").isPresent());
+
+ dbos.deleteQueue("q-map-del");
+ awaitCondition(() -> qs.findDynamicQueue("q-map-del").isEmpty());
+ }
+
+ @Test
+ public void testRegisterQueueValidation() throws Exception {
+ dbos.launch();
+
+ // Zero or negative polling interval should fail.
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ dbos.registerQueue(
+ "q-bad-poll", QueueOptions.setPollingInterval(Duration.ofSeconds(-1))));
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> dbos.registerQueue("q-bad-poll", QueueOptions.setPollingInterval(Duration.ZERO)));
+
+ // Zero or negative concurrency should fail.
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> dbos.registerQueue("q-bad-conc", QueueOptions.setConcurrency(0)));
+ }
+
+ @Test
+ public void testDedupeId() throws Exception {
+
+ ServiceQ serviceQ = dbos.registerProxy(ServiceQ.class, new ServiceQImpl());
+ dbos.launch();
+
+ var qs = DBOSTestAccess.getQueueService(dbos);
+ qs.setSpeedupForTest();
+ dbos.registerQueue("firstQueue", QueueOptions.empty());
+
+ // pause queue service for test validation
+ qs.pause();
+
+ var options = new StartWorkflowOptions().withQueue("firstQueue");
+ var dedupeId = "dedupeId";
+ var h1 =
+ dbos.startWorkflow(
+ () -> serviceQ.simpleQWorkflow("abc"), options.withDeduplicationId(dedupeId));
+ var s1 = h1.getStatus();
+ assertEquals(s1.queueName(), "firstQueue");
+ assertEquals(s1.deduplicationId(), dedupeId);
+
+ // enqueue with different dedupe ID should be fine
+ var dedupeId2 = "different-dedupeId";
+ var h2 =
+ dbos.startWorkflow(
+ () -> serviceQ.simpleQWorkflow("def"), options.withDeduplicationId(dedupeId2));
+ var s2 = h2.getStatus();
+ assertEquals(s2.queueName(), "firstQueue");
+ assertEquals(s2.deduplicationId(), dedupeId2);
+
+ // enqueue with no dedupe ID should be fine
+ var h3 = dbos.startWorkflow(() -> serviceQ.simpleQWorkflow("ghi"), options);
+ var s3 = h3.getStatus();
+ assertEquals(s3.queueName(), "firstQueue");
+ assertNull(s3.deduplicationId());
+
+ assertThrows(
+ RuntimeException.class,
+ () ->
+ dbos.startWorkflow(
+ () -> serviceQ.simpleQWorkflow("jkl"), options.withDeduplicationId(dedupeId)));
+
+ // enable queue service to run
+ qs.unpause();
+
+ // wait for initial workflow with initial dedupe ID to finish
+ h1.getResult();
+ h2.getResult();
+ h3.getResult();
+
+ var h4 =
+ dbos.startWorkflow(
+ () -> serviceQ.simpleQWorkflow("jkl"), options.withDeduplicationId(dedupeId));
+ h4.getResult();
+
+ var rows = DBUtils.getWorkflowRows(dataSource);
+ assertEquals(4, rows.size());
+
+ for (var row : rows) {
+ assertEquals(WorkflowState.SUCCESS.name(), row.status());
+ assertEquals("firstQueue", row.queueName());
+ assertNull(row.deduplicationId());
+ }
+ }
+
+ @Test
+ public void testDedupeIdWithDelay() throws Exception {
+
+ ServiceQ serviceQ = dbos.registerProxy(ServiceQ.class, new ServiceQImpl());
+ dbos.launch();
+
+ var qs = DBOSTestAccess.getQueueService(dbos);
+ qs.setSpeedupForTest();
+ dbos.registerQueue("firstQueue", QueueOptions.empty());
+
+ qs.pause();
+
+ var dedupeId = "dedupeId";
+ var options = new StartWorkflowOptions().withQueue("firstQueue").withDeduplicationId(dedupeId);
+ var h1 =
+ dbos.startWorkflow(
+ () -> serviceQ.simpleQWorkflow("abc"), options.withDelay(Duration.ofHours(1)));
+ var s1 = h1.getStatus();
+ assertEquals(WorkflowState.DELAYED, s1.status());
+ assertEquals(dedupeId, s1.deduplicationId());
+
+ // Same dedupe ID should conflict even while DELAYED
+ assertThrows(
+ RuntimeException.class,
+ () -> dbos.startWorkflow(() -> serviceQ.simpleQWorkflow("def"), options));
+
+ // Clear the delay and run
+ dbos.setWorkflowDelay(h1.workflowId(), Instant.now().minusSeconds(1));
+ qs.unpause();
+ h1.getResult();
+
+ // After completion the dedupe ID is released — re-enqueue should succeed
+ var h2 = dbos.startWorkflow(() -> serviceQ.simpleQWorkflow("ghi"), options);
+ h2.getResult();
+ }
+
+ @Test
+ public void testPriority() throws Exception {
+
+ ServiceQImpl impl = new ServiceQImpl();
+ ServiceQ serviceQ = dbos.registerProxy(ServiceQ.class, impl);
+ dbos.launch();
+
+ var qs = DBOSTestAccess.getQueueService(dbos);
+ qs.setSpeedupForTest();
+ dbos.registerQueue(
+ "firstQueue",
+ QueueOptions.setPriorityEnabled(true).andConcurrency(1).andWorkerConcurrency(1));
+
+ qs.pause();
+
+ var o1 = new StartWorkflowOptions().withQueue("firstQueue").withPriority(100);
+ var h1 = dbos.startWorkflow(() -> serviceQ.priorityWorkflow(100), o1);
+
+ var o2 = new StartWorkflowOptions().withQueue("firstQueue").withPriority(50);
+ var h2 = dbos.startWorkflow(() -> serviceQ.priorityWorkflow(50), o2);
+
+ var o3 = new StartWorkflowOptions().withQueue("firstQueue").withPriority(10);
+ var h3 = dbos.startWorkflow(() -> serviceQ.priorityWorkflow(10), o3);
+
+ qs.unpause();
+
+ h1.getResult();
+ h2.getResult();
+ h3.getResult();
+
+ assertEquals(3, impl.queue.size());
+ assertEquals(10, impl.queue.remove());
+ assertEquals(50, impl.queue.remove());
+ assertEquals(100, impl.queue.remove());
+ }
+
+ @Test
+ public void testQueuedMultipleWorkflows() throws Exception {
+
+ ServiceQ serviceQ = dbos.registerProxy(ServiceQ.class, new ServiceQImpl());
+ dbos.launch();
+
+ var qs = DBOSTestAccess.getQueueService(dbos);
+ qs.setSpeedupForTest();
+ dbos.registerQueue("firstQueue", QueueOptions.setConcurrency(1).andWorkerConcurrency(1));
+
+ qs.pause();
+ Thread.sleep(2000);
+
+ for (int i = 0; i < 5; i++) {
+ String id = "wfid" + i;
+ var input = "inputq" + i;
+ dbos.startWorkflow(
+ () -> serviceQ.simpleQWorkflow(input),
+ new StartWorkflowOptions(id).withQueue("firstQueue"));
+ }
+
+ var input = new ListWorkflowsInput().withQueuesOnly(true).withLoadInput(true);
+ List wfs = dbos.listWorkflows(input);
+
+ for (int i = 0; i < 5; i++) {
+ String id = "wfid" + i;
+
+ assertEquals(id, wfs.get(i).workflowId());
+ assertEquals(WorkflowState.ENQUEUED, wfs.get(i).status());
+ }
+
+ qs.unpause();
+
+ for (int i = 0; i < 5; i++) {
+ String id = "wfid" + i;
+
+ var handle = dbos.retrieveWorkflow(id);
+ assertEquals(id, handle.workflowId());
+ String result = (String) handle.getResult();
+ assertEquals("inputq" + i + "inputq" + i, result);
+ assertEquals(WorkflowState.SUCCESS, handle.getStatus().status());
+ }
+ }
+
+ @Test
+ void testListQueuedWorkflow() throws Exception {
+
+ ServiceQ serviceQ = dbos.registerProxy(ServiceQ.class, new ServiceQImpl());
+ dbos.launch();
+
+ var qs = DBOSTestAccess.getQueueService(dbos);
+ qs.setSpeedupForTest();
+ dbos.registerQueue("firstQueue", QueueOptions.setConcurrency(1).andWorkerConcurrency(1));
+
+ qs.pause();
+
+ for (int i = 0; i < 5; i++) {
+ String id = "wfid" + i;
+ var input = "inputq" + i;
+ dbos.startWorkflow(
+ () -> serviceQ.simpleQWorkflow(input),
+ new StartWorkflowOptions(id).withQueue("firstQueue"));
+ Thread.sleep(100);
+ }
+
+ var input = new ListWorkflowsInput().withQueuesOnly(true).withLoadInput(true);
+ List wfs = dbos.listWorkflows(input);
+ wfs.sort(
+ (a, b) -> {
+ return a.workflowId().compareTo(b.workflowId());
+ });
+
+ for (int i = 0; i < 5; i++) {
+ String id = "wfid" + i;
+
+ assertEquals(id, wfs.get(i).workflowId());
+ assertEquals(WorkflowState.ENQUEUED, wfs.get(i).status());
+ }
+
+ wfs = dbos.listWorkflows(input.withQueueName("abc"));
+ assertEquals(0, wfs.size());
+
+ wfs = dbos.listWorkflows(input.withQueueName("firstQueue"));
+ assertEquals(5, wfs.size());
+
+ wfs = dbos.listWorkflows(input.withEndTime(Instant.now().minus(10, ChronoUnit.SECONDS)));
+ assertEquals(0, wfs.size());
+ }
+
+ @Test
+ public void multipleQueues() throws Exception {
+
+ ServiceQ serviceQ1 = dbos.registerProxy(ServiceQ.class, new ServiceQImpl());
+ ServiceI serviceI = dbos.registerProxy(ServiceI.class, new ServiceIImpl());
+ dbos.launch();
+
+ var qs = DBOSTestAccess.getQueueService(dbos);
+ qs.setSpeedupForTest();
+ dbos.registerQueue("firstQueue", QueueOptions.setConcurrency(1).andWorkerConcurrency(1));
+ dbos.registerQueue("secondQueue", QueueOptions.setConcurrency(1).andWorkerConcurrency(1));
+
+ String id1 = "firstQ1234";
+ String id2 = "second1234";
+
+ var options1 = new StartWorkflowOptions(id1).withQueue("firstQueue");
+ WorkflowHandle handle1 =
+ dbos.startWorkflow(() -> serviceQ1.simpleQWorkflow("firstinput"), options1);
+
+ var options2 = new StartWorkflowOptions(id2).withQueue("secondQueue");
+ WorkflowHandle handle2 = dbos.startWorkflow(() -> serviceI.workflowI(25), options2);
+
+ assertEquals(id1, handle1.workflowId());
+ String result = handle1.getResult();
+ assertEquals("firstQueue", handle1.getStatus().queueName());
+ assertEquals("firstinputfirstinput", result);
+ assertEquals(WorkflowState.SUCCESS, handle1.getStatus().status());
+
+ assertEquals(id2, handle2.workflowId());
+ Integer result2 = (Integer) handle2.getResult();
+ assertEquals("secondQueue", handle2.getStatus().queueName());
+ assertEquals(50, result2);
+ assertEquals(WorkflowState.SUCCESS, handle2.getStatus().status());
+ }
+
+ @Test
+ public void testLimiter() throws Exception {
+
+ int limit = 5;
+ double periodSec = 1.8;
+ Duration period = Duration.ofMillis((long) (periodSec * 1000));
+
+ ServiceQ serviceQ = dbos.registerProxy(ServiceQ.class, new ServiceQImpl());
+ dbos.launch();
+
+ var qs = DBOSTestAccess.getQueueService(dbos);
+ qs.setSpeedupForTest();
+ dbos.registerQueue(
+ "limitQueue",
+ QueueOptions.setRateLimit(limit, period).andConcurrency(1).andWorkerConcurrency(1));
+ Thread.sleep(1000);
+
+ int numWaves = 3;
+ int numTasks = numWaves * limit;
+ List> handles = new ArrayList<>();
+ List times = new ArrayList<>();
+
+ for (int i = 0; i < numTasks; i++) {
+ String id = "id" + i;
+ var options = new StartWorkflowOptions(id).withQueue("limitQueue");
+ WorkflowHandle handle =
+ dbos.startWorkflow(() -> serviceQ.limitWorkflow("abc", "123"), options);
+ handles.add(handle);
+ }
+
+ for (WorkflowHandle h : handles) {
+ double result = h.getResult();
+ logger.info(String.valueOf(result));
+ times.add(result);
+ }
+
+ double waveTolerance = 1.0;
+ for (int wave = 0; wave < numWaves; wave++) {
+ for (int i = wave * limit; i < (wave + 1) * limit - 1; i++) {
+ double diff = times.get(i + 1) - times.get(i);
+ logger.info(String.format("Wave %d, Task %d-%d: Time diff %.3f", wave, i, i + 1, diff));
+ assertTrue(
+ diff < waveTolerance,
+ String.format(
+ "Wave %d: Tasks %d and %d should start close together. Diff: %.3f",
+ wave, i, i + 1, diff));
+ }
+ }
+ logger.info("Verified intra-wave timing.");
+
+ double periodTolerance = 0.5;
+ for (int wave = 0; wave < numWaves - 1; wave++) {
+ double startOfNextWave = times.get(limit * (wave + 1));
+ double startOfCurrentWave = times.get(limit * wave);
+ double gap = startOfNextWave - startOfCurrentWave;
+ logger.info(String.format("Gap between Wave %d and %d: %.3f", wave, wave + 1, gap));
+ assertTrue(
+ gap > periodSec - periodTolerance,
+ String.format(
+ "Gap between wave %d and %d should be at least %.3f. Actual: %.3f",
+ wave, wave + 1, periodSec - periodTolerance, gap));
+ assertTrue(
+ gap < periodSec + periodTolerance,
+ String.format(
+ "Gap between wave %d and %d should be at most %.3f. Actual: %.3f",
+ wave, wave + 1, periodSec + periodTolerance, gap));
+ }
+
+ for (WorkflowHandle h : handles) {
+ assertEquals(WorkflowState.SUCCESS, h.getStatus().status());
+ }
+ }
+
+ @Test
+ public void testWorkerConcurrency() throws Exception {
+
+ dbos.launch();
+ var systemDatabase = DBOSTestAccess.getSystemDatabase(dbos);
+ var dbosExecutor = DBOSTestAccess.getDbosExecutor(dbos);
+ var queueService = DBOSTestAccess.getQueueService(dbos);
+
+ dbos.registerQueue("QwithWCLimit", QueueOptions.setConcurrency(3).andWorkerConcurrency(2));
+ Queue qwithWCLimit = dbos.findQueue("QwithWCLimit").get();
+
+ String executorId = dbosExecutor.executorId();
+ String appVersion = dbosExecutor.appVersion();
+
+ queueService.close();
+ while (!queueService.isStopped()) {
+ Thread.sleep(2000);
+ logger.info("Waiting for queueService to stop");
+ }
+
+ var serArgs = SerializationUtil.serializeValue(new Object[] {"ORD-12345"}, null, null);
+ var builder =
+ new WorkflowStatusInternalBuilder()
+ .workflowName("OrderProcessingWorkflow")
+ .className("com.example.workflows.OrderWorkflow")
+ .instanceName("prod-config")
+ .authenticatedUser("user123@example.com")
+ .assumedRole("admin")
+ .authenticatedRoles(new String[] {"admin", "operator"})
+ .queueName("QwithWCLimit")
+ .executorId(executorId)
+ .appVersion(appVersion)
+ .appId("order-app-123")
+ .timeout(Duration.ofMillis(300000))
+ .deadline(Instant.ofEpochMilli(System.currentTimeMillis() + 2400000))
+ .priority(1)
+ .inputs(serArgs.serializedValue());
+
+ for (int i = 0; i < 4; i++) {
+ String wfid = "id" + i;
+ var status = builder.workflowId(wfid).deduplicationId("dedup" + i).build();
+ systemDatabase.initWorkflowStatus(status, null, false, false);
+ }
+
+ var readBack = systemDatabase.listWorkflows(new ListWorkflowsInput("id0")).get(0);
+ assertArrayEquals(new String[] {"admin", "operator"}, readBack.authenticatedRoles());
+
+ List idsToRun =
+ systemDatabase.getAndStartQueuedWorkflows(qwithWCLimit, executorId, appVersion, null);
+
+ assertEquals(2, idsToRun.size());
+
+ // run the same above 2 are in Pending.
+ // So no de queueing
+ idsToRun =
+ systemDatabase.getAndStartQueuedWorkflows(qwithWCLimit, executorId, appVersion, null);
+ assertEquals(0, idsToRun.size());
+
+ // mark the first 2 as success
+ DBUtils.updateAllWorkflowStates(
+ dataSource, WorkflowState.PENDING.name(), WorkflowState.SUCCESS.name());
+
+ // next 2 get dequeued
+ idsToRun =
+ systemDatabase.getAndStartQueuedWorkflows(qwithWCLimit, executorId, appVersion, null);
+ assertEquals(2, idsToRun.size());
+
+ DBUtils.updateAllWorkflowStates(
+ dataSource, WorkflowState.PENDING.name(), WorkflowState.SUCCESS.name());
+ idsToRun =
+ systemDatabase.getAndStartQueuedWorkflows(
+ qwithWCLimit, Constants.DEFAULT_EXECUTORID, Constants.DEFAULT_APP_VERSION, null);
+ assertEquals(0, idsToRun.size());
+ }
+
+ @Test
+ public void testGlobalConcurrency() throws Exception {
+
+ dbos.launch();
+ var systemDatabase = DBOSTestAccess.getSystemDatabase(dbos);
+ var dbosExecutor = DBOSTestAccess.getDbosExecutor(dbos);
+ var queueService = DBOSTestAccess.getQueueService(dbos);
+
+ dbos.registerQueue("QwithWCLimit", QueueOptions.setConcurrency(3).andWorkerConcurrency(2));
+ Queue qwithWCLimit = dbos.findQueue("QwithWCLimit").get();
+
+ String executorId = dbosExecutor.executorId();
+ String appVersion = dbosExecutor.appVersion();
+
+ queueService.close();
+ while (!queueService.isStopped()) {
+ Thread.sleep(2000);
+ logger.info("Waiting for queueService to stop");
+ }
+
+ var builder =
+ new WorkflowStatusInternalBuilder()
+ .workflowName("OrderProcessingWorkflow")
+ .className("com.example.workflows.OrderWorkflow")
+ .instanceName("prod-config")
+ .authenticatedUser("user123@example.com")
+ .assumedRole("admin")
+ .authenticatedRoles(new String[] {"admin", "operator"})
+ .queueName("QwithWCLimit")
+ .executorId(executorId)
+ .appVersion(appVersion)
+ .appId("order-app-123")
+ .timeout(Duration.ofMillis(300000))
+ .deadline(Instant.ofEpochMilli(System.currentTimeMillis() + 2400000))
+ .priority(1)
+ .inputs("{\"orderId\":\"ORD-12345\"}");
+
+ // executor1
+ for (int i = 0; i < 2; i++) {
+ String wfid = "id" + i;
+ var status = builder.workflowId(wfid).deduplicationId("dedup" + i).build();
+ systemDatabase.initWorkflowStatus(status, null, false, false);
+ }
+
+ // executor2
+ String executor2 = "remote";
+ for (int i = 2; i < 5; i++) {
+ String wfid = "id" + i;
+ var status =
+ builder.workflowId(wfid).deduplicationId("dedup" + i).executorId(executor2).build();
+ systemDatabase.initWorkflowStatus(status, null, false, false);
+
+ DBUtils.setWorkflowState(dataSource, wfid, WorkflowState.PENDING.name());
+ }
+
+ List idsToRun =
+ systemDatabase.getAndStartQueuedWorkflows(qwithWCLimit, executorId, appVersion, null);
+ // 0 because global concurrency limit is reached
+ assertEquals(0, idsToRun.size());
+
+ DBUtils.updateAllWorkflowStates(
+ dataSource, WorkflowState.PENDING.name(), WorkflowState.SUCCESS.name());
+ idsToRun =
+ systemDatabase.getAndStartQueuedWorkflows(
+ qwithWCLimit,
+ // executorId,
+ executor2,
+ appVersion,
+ null);
+ assertEquals(2, idsToRun.size());
+ }
+
+ @Test
+ public void testenQueueWF() throws Exception {
+
+ ServiceQ serviceQ = dbos.registerProxy(ServiceQ.class, new ServiceQImpl());
+ dbos.launch();
+
+ var qs = DBOSTestAccess.getQueueService(dbos);
+ qs.setSpeedupForTest();
+ dbos.registerQueue("firstQueue", QueueOptions.empty());
+
+ String id = "q1234";
+
+ var option = new StartWorkflowOptions(id).withQueue("firstQueue");
+ WorkflowHandle handle =
+ dbos.startWorkflow(() -> serviceQ.simpleQWorkflow("inputq"), option);
+
+ assertEquals(id, handle.workflowId());
+ String result = handle.getResult();
+ assertEquals("inputqinputq", result);
+ }
+
+ @Test
+ public void testQueueConcurrencyUnderRecovery() throws Exception {
+ ConcurrencyTestServiceImpl impl = new ConcurrencyTestServiceImpl();
+ ConcurrencyTestService service = dbos.registerProxy(ConcurrencyTestService.class, impl);
+ dbos.launch();
+
+ var qs = DBOSTestAccess.getQueueService(dbos);
+ qs.setSpeedupForTest();
+ dbos.registerQueue("test_queue", QueueOptions.setConcurrency(2));
+
+ var opt1 = new StartWorkflowOptions("wf1").withQueue("test_queue");
+ var handle1 = dbos.startWorkflow(() -> service.blockedWorkflow(0), opt1);
+
+ var opt2 = new StartWorkflowOptions("wf2").withQueue("test_queue");
+ var handle2 = dbos.startWorkflow(() -> service.blockedWorkflow(1), opt2);
+
+ var opt3 = new StartWorkflowOptions("wf3").withQueue("test_queue");
+ var handle3 = dbos.startWorkflow(() -> service.noopWorkflow(2), opt3);
+
+ // each call to blockedWorkflow releases the semaphore once,
+ // so block waiting on both calls to release
+ impl.wfSemaphore.acquire(2);
+
+ assertEquals(2, impl.counter.get());
+ assertEquals(WorkflowState.PENDING, handle1.getStatus().status());
+ assertEquals(WorkflowState.PENDING, handle2.getStatus().status());
+ assertEquals(WorkflowState.ENQUEUED, handle3.getStatus().status());
+
+ // update WF3 to appear as if it's from a different executor
+ String sql =
+ "UPDATE dbos.workflow_status SET status = ?, executor_id = ? where workflow_uuid = ?;";
+
+ try (Connection connection = DBUtils.getConnection(dbosConfig);
+ PreparedStatement pstmt = connection.prepareStatement(sql)) {
+
+ pstmt.setString(1, WorkflowState.PENDING.name());
+ pstmt.setString(2, "other");
+ pstmt.setString(3, opt3.workflowId());
+
+ // Execute the update and get the number of rows affected
+ int rowsAffected = pstmt.executeUpdate();
+ assertEquals(1, rowsAffected);
+ }
+
+ var executor = DBOSTestAccess.getDbosExecutor(dbos);
+ List> otherHandles = executor.recoverPendingWorkflows(List.of("other"));
+ assertEquals(WorkflowState.PENDING, handle1.getStatus().status());
+ assertEquals(WorkflowState.PENDING, handle2.getStatus().status());
+ assertEquals(1, otherHandles.size());
+ assertEquals(otherHandles.get(0).workflowId(), handle3.workflowId());
+ assertEquals(WorkflowState.ENQUEUED, handle3.getStatus().status());
+
+ // Pause the listener before recovery so it can't race the ENQUEUED status checks below.
+ qs.pause();
+ List> localHandles = executor.recoverPendingWorkflows(List.of("local"));
+ assertEquals(2, localHandles.size());
+ List expectedWorkflowIds = List.of(handle1.workflowId(), handle2.workflowId());
+ assertTrue(expectedWorkflowIds.contains(localHandles.get(0).workflowId()));
+ assertTrue(expectedWorkflowIds.contains(localHandles.get(1).workflowId()));
+
+ assertEquals(2, impl.counter.get());
+ // Recovery sets back to enqueued.
+ // The enqueued run will get skipped (first run is still blocked)
+ assertEquals(WorkflowState.ENQUEUED, handle1.getStatus().status());
+ assertEquals(WorkflowState.ENQUEUED, handle2.getStatus().status());
+ assertEquals(WorkflowState.ENQUEUED, handle3.getStatus().status());
+
+ qs.unpause();
+ impl.latch.countDown();
+ assertEquals(0, handle1.getResult());
+ assertEquals(1, handle2.getResult());
+ assertEquals(2, handle3.getResult());
+ assertEquals("local", handle3.getStatus().executorId());
+
+ assertTrue(DBUtils.queueEntriesAreCleanedUp(dataSource));
+ }
+
+ @Test
+ public void testListenQueue() throws Exception {
+ var config = dbosConfig.withListenQueue("queueOne");
+ try (var dbos = new DBOS(config)) {
+
+ ServiceQ serviceQ = dbos.registerProxy(ServiceQ.class, new ServiceQImpl());
+ dbos.launch();
+
+ var qs = DBOSTestAccess.getQueueService(dbos);
+ qs.setSpeedupForTest();
+ dbos.registerQueue("queueOne", QueueOptions.empty());
+ dbos.registerQueue("queueTwo", QueueOptions.empty());
+
+ var h2 =
+ dbos.startWorkflow(
+ () -> serviceQ.simpleQWorkflow("two"),
+ new StartWorkflowOptions().withQueue("queueTwo"));
+ var h1 =
+ dbos.startWorkflow(
+ () -> serviceQ.simpleQWorkflow("one"),
+ new StartWorkflowOptions().withQueue("queueOne"));
+
+ Thread.sleep(3000);
+ assertEquals("oneone", h1.getResult());
+ assertEquals(WorkflowState.ENQUEUED, h2.getStatus().status());
+ }
+ }
+
+ private static void awaitCondition(BooleanSupplier condition) throws InterruptedException {
+ long deadline = System.currentTimeMillis() + 2000;
+ while (!condition.getAsBoolean()) {
+ if (System.currentTimeMillis() > deadline)
+ throw new AssertionError("Condition not met within 2s");
+ Thread.sleep(50);
+ }
+ }
+}
diff --git a/transact/src/test/java/dev/dbos/transact/queue/QueuesTest.java b/transact/src/test/java/dev/dbos/transact/queue/StaticQueuesTest.java
similarity index 96%
rename from transact/src/test/java/dev/dbos/transact/queue/QueuesTest.java
rename to transact/src/test/java/dev/dbos/transact/queue/StaticQueuesTest.java
index 9fdb3590..b4b40468 100644
--- a/transact/src/test/java/dev/dbos/transact/queue/QueuesTest.java
+++ b/transact/src/test/java/dev/dbos/transact/queue/StaticQueuesTest.java
@@ -34,9 +34,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class QueuesTest {
+/**
+ * Tests for the static (in-memory) queue registration path. All queues here are registered via
+ * {@code dbos.registerQueue(Queue)} before launch, which exercises the pre-launch static listener
+ * code path. See {@link QueuesTest} for the equivalent tests using database-backed dynamic queues.
+ */
+public class StaticQueuesTest {
- private static final Logger logger = LoggerFactory.getLogger(QueuesTest.class);
+ private static final Logger logger = LoggerFactory.getLogger(StaticQueuesTest.class);
@AutoClose final PgContainer pgContainer = new PgContainer();
@@ -336,7 +341,8 @@ public void multipleQueues() throws Exception {
public void testLimiter() throws Exception {
int limit = 5;
- double period = 1.8; //
+ double periodSec = 1.8;
+ Duration period = Duration.ofMillis((long) (periodSec * 1000));
Queue limitQ =
new Queue("limitQueue")
@@ -392,15 +398,15 @@ public void testLimiter() throws Exception {
double gap = startOfNextWave - startOfCurrentWave;
logger.info(String.format("Gap between Wave %d and %d: %.3f", wave, wave + 1, gap));
assertTrue(
- gap > period - periodTolerance,
+ gap > periodSec - periodTolerance,
String.format(
"Gap between wave %d and %d should be at least %.3f. Actual: %.3f",
- wave, wave + 1, period - periodTolerance, gap));
+ wave, wave + 1, periodSec - periodTolerance, gap));
assertTrue(
- gap < period + periodTolerance,
+ gap < periodSec + periodTolerance,
String.format(
"Gap between wave %d and %d should be at most %.3f. Actual: %.3f",
- wave, wave + 1, period + periodTolerance, gap));
+ wave, wave + 1, periodSec + periodTolerance, gap));
}
for (WorkflowHandle h : handles) {
diff --git a/transact/src/test/java/dev/dbos/transact/utils/PgContainer.java b/transact/src/test/java/dev/dbos/transact/utils/PgContainer.java
index 6c029791..b623f87d 100644
--- a/transact/src/test/java/dev/dbos/transact/utils/PgContainer.java
+++ b/transact/src/test/java/dev/dbos/transact/utils/PgContainer.java
@@ -76,7 +76,8 @@ public static void truncateDbosTables(Connection conn) throws SQLException {
"dbos".event_dispatch_kv,
"dbos".streams,
"dbos".application_versions,
- "dbos".workflow_schedules
+ "dbos".workflow_schedules,
+ "dbos".queues
CASCADE
""";
try (var stmt = conn.createStatement()) {