diff --git a/transact/src/main/java/dev/dbos/transact/DBOS.java b/transact/src/main/java/dev/dbos/transact/DBOS.java index 6645266e..e00c5dcd 100644 --- a/transact/src/main/java/dev/dbos/transact/DBOS.java +++ b/transact/src/main/java/dev/dbos/transact/DBOS.java @@ -14,6 +14,8 @@ import dev.dbos.transact.workflow.ForkOptions; import dev.dbos.transact.workflow.ListWorkflowsInput; import dev.dbos.transact.workflow.Queue; +import dev.dbos.transact.workflow.QueueConflictResolution; +import dev.dbos.transact.workflow.QueueOptions; import dev.dbos.transact.workflow.ScheduleStatus; import dev.dbos.transact.workflow.SerializationStrategy; import dev.dbos.transact.workflow.Step; @@ -164,6 +166,76 @@ public void registerQueues(@NonNull Queue... queues) { } } + /** + * Register a database-backed dynamic queue. Must be called after launch. Queue configuration can + * be updated at runtime via {@link #updateQueue(String, QueueOptions)}. + * + *

Uses {@link QueueConflictResolution#UPDATE_IF_LATEST_VERSION} by default: the existing + * configuration is overwritten only if this executor is running the latest application version. + * + * @param name Queue name + * @param options Initial configuration options + */ + public void registerQueue(@NonNull String name, @NonNull QueueOptions options) { + ensureLaunched("registerQueue") + .registerDynamicQueue(name, options, QueueConflictResolution.UPDATE_IF_LATEST_VERSION); + } + + /** + * Register a database-backed dynamic queue. Must be called after launch. Queue configuration can + * be updated at runtime via {@link #updateQueue(String, QueueOptions)}. + * + * @param name Queue name + * @param options Initial configuration options + * @param onConflict How to handle an existing queue with the same name + */ + public void registerQueue( + @NonNull String name, + @NonNull QueueOptions options, + @NonNull QueueConflictResolution onConflict) { + ensureLaunched("registerQueue").registerDynamicQueue(name, options, onConflict); + } + + /** + * Update the configuration of a database-backed dynamic queue. Must be called after launch. Only + * fields that are present in {@code options} are written; absent fields are left unchanged. + * + * @param name Queue name + * @param options Fields to update + */ + public void updateQueue(@NonNull String name, @NonNull QueueOptions options) { + ensureLaunched("updateQueue").updateDynamicQueue(name, options); + } + + /** + * Retrieve a database-backed dynamic queue by name. Must be called after launch. + * + * @param name Queue name + * @return the queue if it exists in the database, or empty + */ + public @NonNull Optional findQueue(@NonNull String name) { + return ensureLaunched("findQueue").findDynamicQueue(name); + } + + /** + * Delete a database-backed dynamic queue. Must be called after launch. + * + * @param name Queue name + * @return true if the queue was deleted, false if it did not exist + */ + public boolean deleteQueue(@NonNull String name) { + return ensureLaunched("deleteQueue").deleteDynamicQueue(name); + } + + /** + * List all database-backed dynamic queues. Must be called after launch. + * + * @return list of all queues currently registered in the database + */ + public @NonNull List listQueues() { + return ensureLaunched("listQueues").listDynamicQueues(); + } + /** * Register all workflows and steps in the provided class instance * @@ -311,7 +383,7 @@ private DBOSExecutor ensureLaunched(String caller) { * @return Optional containing the queue definition for given `queueName`, or empty if not found */ public @NonNull Optional getQueue(@NonNull String queueName) { - return ensureLaunched("getQueue").getQueue(queueName); + return ensureLaunched("getQueue").findStaticQueue(queueName); } /** diff --git a/transact/src/main/java/dev/dbos/transact/DBOSClient.java b/transact/src/main/java/dev/dbos/transact/DBOSClient.java index 01fdfed2..a011b206 100644 --- a/transact/src/main/java/dev/dbos/transact/DBOSClient.java +++ b/transact/src/main/java/dev/dbos/transact/DBOSClient.java @@ -13,6 +13,9 @@ import dev.dbos.transact.json.SerializationUtil; import dev.dbos.transact.workflow.ForkOptions; import dev.dbos.transact.workflow.ListWorkflowsInput; +import dev.dbos.transact.workflow.Queue; +import dev.dbos.transact.workflow.QueueConflictResolution; +import dev.dbos.transact.workflow.QueueOptions; import dev.dbos.transact.workflow.ScheduleStatus; import dev.dbos.transact.workflow.SerializationStrategy; import dev.dbos.transact.workflow.StepInfo; @@ -1097,4 +1100,78 @@ public void setWorkflowDelay(@NonNull String workflowId, @NonNull Instant delayU Objects.requireNonNull(delayUntil, "delayUntil must not be null")); systemDatabase.setWorkflowDelay(workflowId, wfDelay); } + + /** + * Register a database-backed dynamic queue. Uses {@link QueueConflictResolution#ALWAYS_UPDATE} by + * default. + * + * @param name Queue name + * @param options Configuration options + */ + public void registerQueue(@NonNull String name, @NonNull QueueOptions options) { + registerQueue(name, options, QueueConflictResolution.ALWAYS_UPDATE); + } + + /** + * Register a database-backed dynamic queue. + * + *

{@link QueueConflictResolution#UPDATE_IF_LATEST_VERSION} is not supported by {@code + * DBOSClient} because clients are not associated with an application version. Use {@link + * QueueConflictResolution#ALWAYS_UPDATE} or {@link QueueConflictResolution#NEVER_UPDATE}. + * + * @param name Queue name + * @param options Configuration options + * @param onConflict How to handle an existing queue with the same name + */ + public void registerQueue( + @NonNull String name, + @NonNull QueueOptions options, + @NonNull QueueConflictResolution onConflict) { + if (onConflict == QueueConflictResolution.UPDATE_IF_LATEST_VERSION) { + throw new IllegalArgumentException( + "DBOSClient.registerQueue does not support UPDATE_IF_LATEST_VERSION because clients are" + + " not associated with an application version. Use ALWAYS_UPDATE or NEVER_UPDATE."); + } + systemDatabase.upsertQueue(name, options, onConflict == QueueConflictResolution.ALWAYS_UPDATE); + } + + /** + * Update the configuration of a database-backed dynamic queue. Only fields that are present in + * {@code options} are written; absent fields are left unchanged. + * + * @param name Queue name + * @param options Fields to update + */ + public void updateQueue(@NonNull String name, @NonNull QueueOptions options) { + systemDatabase.updateQueue(name, options); + } + + /** + * Retrieve a database-backed dynamic queue by name. + * + * @param name Queue name + * @return the queue if it exists in the database, or empty + */ + public @NonNull Optional findQueue(@NonNull String name) { + return systemDatabase.findQueue(name); + } + + /** + * List all database-backed dynamic queues. + * + * @return list of all queues currently registered in the database + */ + public @NonNull List listQueues() { + return systemDatabase.listQueues(); + } + + /** + * Delete a database-backed dynamic queue. + * + * @param name Queue name + * @return true if the queue was deleted, false if it did not exist + */ + public boolean deleteQueue(@NonNull String name) { + return systemDatabase.deleteQueue(name); + } } diff --git a/transact/src/main/java/dev/dbos/transact/admin/AdminServer.java b/transact/src/main/java/dev/dbos/transact/admin/AdminServer.java index 1c1ebb29..2819c92a 100644 --- a/transact/src/main/java/dev/dbos/transact/admin/AdminServer.java +++ b/transact/src/main/java/dev/dbos/transact/admin/AdminServer.java @@ -140,7 +140,7 @@ private void deactivate(HttpExchange exchange) throws IOException { } private void workflowQueuesMetadata(HttpExchange exchange) throws IOException { - var queues = dbosExecutor.getQueues(); + var queues = dbosExecutor.getStaticQueues(); sendMappedJson(exchange, 200, queues); } diff --git a/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java b/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java index 40be94cb..e7e2569a 100644 --- a/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java +++ b/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java @@ -6,6 +6,7 @@ import dev.dbos.transact.workflow.ExportedWorkflow; import dev.dbos.transact.workflow.ForkOptions; import dev.dbos.transact.workflow.ListWorkflowsInput; +import dev.dbos.transact.workflow.Queue; import dev.dbos.transact.workflow.StepInfo; import dev.dbos.transact.workflow.WorkflowHandle; import dev.dbos.transact.workflow.WorkflowSchedule; @@ -702,6 +703,7 @@ CompletableFuture getResponseAsync(BaseMessage message, WebSocket case EXPORT_WORKFLOW -> handleExportWorkflow(this, message, ws); case FORK_WORKFLOW -> handleFork(this, message); case GET_METRICS -> handleGetMetrics(this, message); + case GET_QUEUE -> handleGetQueue(this, message); case GET_SCHEDULE -> handleGetSchedule(this, message); case GET_WORKFLOW_AGGREGATES -> handleGetWorkflowAggregates(this, message); case GET_WORKFLOW_EVENTS -> handleGetWorkflowEvents(this, message); @@ -711,6 +713,7 @@ CompletableFuture getResponseAsync(BaseMessage message, WebSocket case IMPORT_WORKFLOW -> handleImportWorkflow(this, message); case LIST_APPLICATION_VERSIONS -> handleListApplicationVersions(this, message); case LIST_QUEUED_WORKFLOWS -> handleListQueuedWorkflows(this, message); + case LIST_QUEUES -> handleListQueues(this, message); case LIST_SCHEDULES -> handleListSchedules(this, message); case LIST_STEPS -> handleListSteps(this, message); case LIST_WORKFLOWS -> handleListWorkflows(this, message); @@ -1371,6 +1374,35 @@ static CompletableFuture handleGetSchedule( }); } + static CompletableFuture handleListQueues( + Conductor conductor, BaseMessage message) { + return CompletableFuture.supplyAsync( + () -> { + try { + List queues = conductor.systemDatabase.listQueues(); + List output = queues.stream().map(QueueOutput::from).toList(); + return new ListQueuesResponse(message, output); + } catch (Exception e) { + logger.error("Exception encountered when listing queues", e); + return new ListQueuesResponse(message, e.getMessage()); + } + }); + } + + static CompletableFuture handleGetQueue(Conductor conductor, BaseMessage message) { + return CompletableFuture.supplyAsync( + () -> { + GetQueueRequest request = (GetQueueRequest) message; + try { + var queue = conductor.systemDatabase.findQueue(request.name); + return new GetQueueResponse(request, queue.map(QueueOutput::from).orElse(null)); + } catch (Exception e) { + logger.error("Exception encountered when getting queue {}", request.name, e); + return new GetQueueResponse(request, e.getMessage()); + } + }); + } + static CompletableFuture handlePauseSchedule( Conductor conductor, BaseMessage message) { return CompletableFuture.supplyAsync( diff --git a/transact/src/main/java/dev/dbos/transact/conductor/protocol/BaseMessage.java b/transact/src/main/java/dev/dbos/transact/conductor/protocol/BaseMessage.java index 41835223..97fb7e6e 100644 --- a/transact/src/main/java/dev/dbos/transact/conductor/protocol/BaseMessage.java +++ b/transact/src/main/java/dev/dbos/transact/conductor/protocol/BaseMessage.java @@ -19,6 +19,7 @@ @JsonSubTypes.Type(value = ExportWorkflowRequest.class, name = "export_workflow"), @JsonSubTypes.Type(value = ForkWorkflowRequest.class, name = "fork_workflow"), @JsonSubTypes.Type(value = GetMetricsRequest.class, name = "get_metrics"), + @JsonSubTypes.Type(value = GetQueueRequest.class, name = "get_queue"), @JsonSubTypes.Type(value = GetScheduleRequest.class, name = "get_schedule"), @JsonSubTypes.Type(value = GetWorkflowAggregatesRequest.class, name = "get_workflow_aggregates"), @JsonSubTypes.Type(value = GetWorkflowEventsRequest.class, name = "get_workflow_events"), @@ -32,6 +33,7 @@ value = ListApplicationVersionsRequest.class, name = "list_application_versions"), @JsonSubTypes.Type(value = ListQueuedWorkflowsRequest.class, name = "list_queued_workflows"), + @JsonSubTypes.Type(value = ListQueuesRequest.class, name = "list_queues"), @JsonSubTypes.Type(value = ListSchedulesRequest.class, name = "list_schedules"), @JsonSubTypes.Type(value = ListStepsRequest.class, name = "list_steps"), @JsonSubTypes.Type(value = ListWorkflowsRequest.class, name = "list_workflows"), diff --git a/transact/src/main/java/dev/dbos/transact/conductor/protocol/GetQueueRequest.java b/transact/src/main/java/dev/dbos/transact/conductor/protocol/GetQueueRequest.java new file mode 100644 index 00000000..69da526f --- /dev/null +++ b/transact/src/main/java/dev/dbos/transact/conductor/protocol/GetQueueRequest.java @@ -0,0 +1,10 @@ +package dev.dbos.transact.conductor.protocol; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class GetQueueRequest extends BaseMessage { + public String name; + + public GetQueueRequest() {} +} diff --git a/transact/src/main/java/dev/dbos/transact/conductor/protocol/GetQueueResponse.java b/transact/src/main/java/dev/dbos/transact/conductor/protocol/GetQueueResponse.java new file mode 100644 index 00000000..a31cca2f --- /dev/null +++ b/transact/src/main/java/dev/dbos/transact/conductor/protocol/GetQueueResponse.java @@ -0,0 +1,17 @@ +package dev.dbos.transact.conductor.protocol; + +public class GetQueueResponse extends BaseResponse { + public QueueOutput output; + + public GetQueueResponse() {} + + public GetQueueResponse(BaseMessage message, QueueOutput output) { + super(message.type, message.request_id); + this.output = output; + } + + public GetQueueResponse(BaseMessage message, String errorMessage) { + super(message.type, message.request_id, errorMessage); + this.output = null; + } +} diff --git a/transact/src/main/java/dev/dbos/transact/conductor/protocol/ListQueuesRequest.java b/transact/src/main/java/dev/dbos/transact/conductor/protocol/ListQueuesRequest.java new file mode 100644 index 00000000..8e787ecf --- /dev/null +++ b/transact/src/main/java/dev/dbos/transact/conductor/protocol/ListQueuesRequest.java @@ -0,0 +1,8 @@ +package dev.dbos.transact.conductor.protocol; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class ListQueuesRequest extends BaseMessage { + public ListQueuesRequest() {} +} diff --git a/transact/src/main/java/dev/dbos/transact/conductor/protocol/ListQueuesResponse.java b/transact/src/main/java/dev/dbos/transact/conductor/protocol/ListQueuesResponse.java new file mode 100644 index 00000000..ed45fe47 --- /dev/null +++ b/transact/src/main/java/dev/dbos/transact/conductor/protocol/ListQueuesResponse.java @@ -0,0 +1,20 @@ +package dev.dbos.transact.conductor.protocol; + +import java.util.Collections; +import java.util.List; + +public class ListQueuesResponse extends BaseResponse { + public List output; + + public ListQueuesResponse() {} + + public ListQueuesResponse(BaseMessage message, List output) { + super(message.type, message.request_id); + this.output = output; + } + + public ListQueuesResponse(BaseMessage message, String errorMessage) { + super(message.type, message.request_id, errorMessage); + this.output = Collections.emptyList(); + } +} diff --git a/transact/src/main/java/dev/dbos/transact/conductor/protocol/MessageType.java b/transact/src/main/java/dev/dbos/transact/conductor/protocol/MessageType.java index a07463ff..957e351e 100644 --- a/transact/src/main/java/dev/dbos/transact/conductor/protocol/MessageType.java +++ b/transact/src/main/java/dev/dbos/transact/conductor/protocol/MessageType.java @@ -10,6 +10,7 @@ public enum MessageType { EXPORT_WORKFLOW("export_workflow"), FORK_WORKFLOW("fork_workflow"), GET_METRICS("get_metrics"), + GET_QUEUE("get_queue"), GET_SCHEDULE("get_schedule"), GET_WORKFLOW_AGGREGATES("get_workflow_aggregates"), GET_WORKFLOW_EVENTS("get_workflow_events"), @@ -19,6 +20,7 @@ public enum MessageType { IMPORT_WORKFLOW("import_workflow"), LIST_APPLICATION_VERSIONS("list_application_versions"), LIST_QUEUED_WORKFLOWS("list_queued_workflows"), + LIST_QUEUES("list_queues"), LIST_SCHEDULES("list_schedules"), LIST_STEPS("list_steps"), LIST_WORKFLOWS("list_workflows"), diff --git a/transact/src/main/java/dev/dbos/transact/conductor/protocol/QueueOutput.java b/transact/src/main/java/dev/dbos/transact/conductor/protocol/QueueOutput.java new file mode 100644 index 00000000..f8c97dee --- /dev/null +++ b/transact/src/main/java/dev/dbos/transact/conductor/protocol/QueueOutput.java @@ -0,0 +1,27 @@ +package dev.dbos.transact.conductor.protocol; + +import dev.dbos.transact.workflow.Queue; + +public record QueueOutput( + String name, + Integer concurrency, + Integer worker_concurrency, + Integer rate_limit_max, + Double rate_limit_period_sec, + boolean priority_enabled, + boolean partition_queue, + double polling_interval_sec) { + + public static QueueOutput from(Queue q) { + Queue.RateLimit rl = q.rateLimit(); + return new QueueOutput( + q.name(), + q.concurrency(), + q.workerConcurrency(), + rl != null ? rl.limit() : null, + rl != null ? rl.period().toMillis() / 1000.0 : null, + q.priorityEnabled(), + q.partitioningEnabled(), + q.pollingInterval().toMillis() / 1000.0); + } +} diff --git a/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java b/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java index b0d100f3..0e01b1a6 100644 --- a/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java +++ b/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java @@ -22,6 +22,7 @@ import dev.dbos.transact.workflow.ListWorkflowsInput; import dev.dbos.transact.workflow.NotificationInfo; import dev.dbos.transact.workflow.Queue; +import dev.dbos.transact.workflow.QueueOptions; import dev.dbos.transact.workflow.ScheduleStatus; import dev.dbos.transact.workflow.StepInfo; import dev.dbos.transact.workflow.VersionInfo; @@ -381,6 +382,30 @@ public List getQueuePartitions(String queueName) { return dbRetry(() -> QueuesDAO.getQueuePartitions(ctx, queueName)); } + public boolean upsertQueue(String name, QueueOptions options, boolean updateExisting) { + if (Constants.DBOS_INTERNAL_QUEUE.equals(name)) { + throw new IllegalArgumentException( + String.format("%s is a reserved queue name", Constants.DBOS_INTERNAL_QUEUE)); + } + return dbRetry(() -> QueuesDAO.upsertQueue(ctx, name, options, updateExisting)); + } + + public void updateQueue(String name, QueueOptions update) { + dbRetry(() -> QueuesDAO.updateQueue(ctx, name, update)); + } + + public Optional findQueue(String name) { + return dbRetry(() -> QueuesDAO.findQueue(ctx, name)); + } + + public List listQueues() { + return dbRetry(() -> QueuesDAO.listQueues(ctx)); + } + + public boolean deleteQueue(String name) { + return dbRetry(() -> QueuesDAO.deleteQueue(ctx, name)); + } + public StepResult checkStepResult(String workflowId, int functionId, String functionName) { return dbRetry( diff --git a/transact/src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java b/transact/src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java index f1409f99..8dbeadc0 100644 --- a/transact/src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java +++ b/transact/src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java @@ -1,7 +1,9 @@ package dev.dbos.transact.database.dao; import dev.dbos.transact.database.DbContext; +import dev.dbos.transact.workflow.Field; import dev.dbos.transact.workflow.Queue; +import dev.dbos.transact.workflow.QueueOptions; import dev.dbos.transact.workflow.WorkflowState; import java.sql.Connection; @@ -9,11 +11,13 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -293,4 +297,267 @@ public static List getQueuePartitions(DbContext ctx, String queueName) } } } + + /** + * Upsert a queue row. Returns true iff a new row was inserted (i.e. the queue did not previously + * exist). Returns false if the row already existed, regardless of whether it was updated. + */ + public static boolean upsertQueue( + DbContext ctx, String name, QueueOptions options, boolean updateExisting) + throws SQLException { + Queue queue = queueFromOptions(name, options); + final String insertSql = + """ + INSERT INTO "%s".queues + (name, concurrency, worker_concurrency, rate_limit_max, rate_limit_period_sec, + priority_enabled, partition_queue, polling_interval_sec, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT (name) DO NOTHING + """ + .formatted(ctx.schema()); + final String updateSql = + """ + UPDATE "%s".queues SET + concurrency = ?, + worker_concurrency = ?, + rate_limit_max = ?, + rate_limit_period_sec = ?, + priority_enabled = ?, + partition_queue = ?, + polling_interval_sec = ?, + updated_at = ? + WHERE name = ? + """ + .formatted(ctx.schema()); + + try (Connection connection = ctx.getConnection()) { + boolean inserted; + try (PreparedStatement ps = connection.prepareStatement(insertSql)) { + bindQueueParams(ps, queue, 1); + inserted = ps.executeUpdate() == 1; + } + if (!inserted && updateExisting) { + try (PreparedStatement ps = connection.prepareStatement(updateSql)) { + setNullableInt(ps, 1, queue.concurrency()); + setNullableInt(ps, 2, queue.workerConcurrency()); + var rateLimit = queue.rateLimit(); + if (rateLimit != null) { + ps.setInt(3, rateLimit.limit()); + ps.setDouble(4, rateLimit.period().toMillis() / 1000.0); + } else { + ps.setNull(3, java.sql.Types.INTEGER); + ps.setNull(4, java.sql.Types.DOUBLE); + } + ps.setBoolean(5, queue.priorityEnabled()); + ps.setBoolean(6, queue.partitioningEnabled()); + ps.setDouble(7, queue.pollingInterval().toMillis() / 1000.0); + ps.setLong(8, System.currentTimeMillis()); + ps.setString(9, queue.name()); + ps.executeUpdate(); + } + } + return inserted; + } + } + + private static void bindQueueParams(PreparedStatement ps, Queue queue, int offset) + throws SQLException { + ps.setString(offset, queue.name()); + setNullableInt(ps, offset + 1, queue.concurrency()); + setNullableInt(ps, offset + 2, queue.workerConcurrency()); + var rateLimit = queue.rateLimit(); + if (rateLimit != null) { + ps.setInt(offset + 3, rateLimit.limit()); + ps.setDouble(offset + 4, rateLimit.period().toMillis() / 1000.0); + } else { + ps.setNull(offset + 3, java.sql.Types.INTEGER); + ps.setNull(offset + 4, java.sql.Types.DOUBLE); + } + ps.setBoolean(offset + 5, queue.priorityEnabled()); + ps.setBoolean(offset + 6, queue.partitioningEnabled()); + ps.setDouble(offset + 7, queue.pollingInterval().toMillis() / 1000.0); + ps.setLong(offset + 8, System.currentTimeMillis()); + } + + public static Optional findQueue(DbContext ctx, String name) throws SQLException { + final String sql = + """ + SELECT name, concurrency, worker_concurrency, + rate_limit_max, rate_limit_period_sec, + priority_enabled, partition_queue, polling_interval_sec + FROM "%s".queues + WHERE name = ? + """ + .formatted(ctx.schema()); + + try (Connection connection = ctx.getConnection(); + PreparedStatement stmt = connection.prepareStatement(sql)) { + stmt.setString(1, name); + try (ResultSet rs = stmt.executeQuery()) { + if (rs.next()) { + return Optional.of(queueFromResultSet(rs)); + } + return Optional.empty(); + } + } + } + + public static List listQueues(DbContext ctx) throws SQLException { + final String sql = + """ + SELECT name, concurrency, worker_concurrency, + rate_limit_max, rate_limit_period_sec, + priority_enabled, partition_queue, polling_interval_sec + FROM "%s".queues + ORDER BY name + """ + .formatted(ctx.schema()); + + try (Connection connection = ctx.getConnection(); + PreparedStatement stmt = connection.prepareStatement(sql); + ResultSet rs = stmt.executeQuery()) { + List queues = new ArrayList<>(); + while (rs.next()) { + queues.add(queueFromResultSet(rs)); + } + return queues; + } + } + + public static void updateQueue(DbContext ctx, String name, QueueOptions update) + throws SQLException { + if (update.isEmpty()) return; + + List setClauses = new ArrayList<>(); + List params = new ArrayList<>(); + + collectField(setClauses, params, "concurrency", update.concurrency()); + collectField(setClauses, params, "worker_concurrency", update.workerConcurrency()); + collectField(setClauses, params, "rate_limit_max", update.rateLimitMax()); + collectField( + setClauses, params, "rate_limit_period_sec", durationToSec(update.rateLimitPeriod())); + collectOptional(setClauses, params, "priority_enabled", update.priorityEnabled()); + collectOptional(setClauses, params, "partition_queue", update.partitionQueue()); + collectOptional( + setClauses, params, "polling_interval_sec", durationToSec(update.pollingInterval())); + + setClauses.add("\"updated_at\" = ?"); + params.add(System.currentTimeMillis()); + params.add(name); + + String sql = + "UPDATE \"%s\".queues SET %s WHERE name = ?" + .formatted(ctx.schema(), String.join(", ", setClauses)); + + try (Connection connection = ctx.getConnection(); + PreparedStatement ps = connection.prepareStatement(sql)) { + for (int i = 0; i < params.size(); i++) { + ps.setObject(i + 1, params.get(i)); + } + ps.executeUpdate(); + } + } + + private static void collectField( + List clauses, List params, String column, Field field) { + if (field.isPresent()) { + clauses.add("\"" + column + "\" = ?"); + params.add(field.get()); + } + } + + private static void collectOptional( + List clauses, List params, String column, Optional opt) { + opt.ifPresent( + value -> { + clauses.add("\"" + column + "\" = ?"); + params.add(value); + }); + } + + private static Field durationToSec(Field field) { + if (!field.isPresent()) return Field.absent(); + Duration d = field.get(); + return Field.of(d != null ? d.toMillis() / 1000.0 : null); + } + + private static Optional durationToSec(Optional opt) { + return opt.map(d -> d.toMillis() / 1000.0); + } + + public static boolean deleteQueue(DbContext ctx, String name) throws SQLException { + final String sql = "DELETE FROM \"%s\".queues WHERE name = ?".formatted(ctx.schema()); + + try (Connection connection = ctx.getConnection(); + PreparedStatement stmt = connection.prepareStatement(sql)) { + stmt.setString(1, name); + return stmt.executeUpdate() > 0; + } + } + + private static Queue queueFromResultSet(ResultSet rs) throws SQLException { + String name = rs.getString("name"); + Integer concurrency = rs.getObject("concurrency", Integer.class); + Integer workerConcurrency = rs.getObject("worker_concurrency", Integer.class); + Integer rateLimitMax = rs.getObject("rate_limit_max", Integer.class); + Double rateLimitPeriodSec = rs.getObject("rate_limit_period_sec", Double.class); + boolean priorityEnabled = rs.getBoolean("priority_enabled"); + boolean partitioningEnabled = rs.getBoolean("partition_queue"); + Double pollingIntervalSec = rs.getObject("polling_interval_sec", Double.class); + + Queue.RateLimit rateLimit = null; + if (rateLimitMax != null && rateLimitPeriodSec != null) { + rateLimit = + new Queue.RateLimit(rateLimitMax, Duration.ofMillis((long) (rateLimitPeriodSec * 1000))); + } + Duration pollingInterval = + pollingIntervalSec != null + ? Duration.ofMillis((long) (pollingIntervalSec * 1000)) + : Queue.DEFAULT_POLLING_INTERVAL; + return new Queue( + name, + concurrency, + workerConcurrency, + priorityEnabled, + partitioningEnabled, + rateLimit, + pollingInterval); + } + + private static Queue queueFromOptions(String name, QueueOptions options) { + Integer concurrencyVal = options.concurrency().isPresent() ? options.concurrency().get() : null; + Integer workerConcurrencyVal = + options.workerConcurrency().isPresent() ? options.workerConcurrency().get() : null; + boolean priorityEnabledVal = options.priorityEnabled().orElse(false); + boolean partitionQueueVal = options.partitionQueue().orElse(false); + + Queue.RateLimit rateLimit = null; + if (options.rateLimitMax().isPresent() + && options.rateLimitPeriod().isPresent() + && options.rateLimitMax().get() != null + && options.rateLimitPeriod().get() != null) { + rateLimit = + new Queue.RateLimit(options.rateLimitMax().get(), options.rateLimitPeriod().get()); + } + + Duration pollingIntervalVal = options.pollingInterval().orElse(Queue.DEFAULT_POLLING_INTERVAL); + + return new Queue( + name, + concurrencyVal, + workerConcurrencyVal, + priorityEnabledVal, + partitionQueueVal, + rateLimit, + pollingIntervalVal); + } + + private static void setNullableInt(PreparedStatement stmt, int index, Integer value) + throws SQLException { + if (value != null) { + stmt.setInt(index, value); + } else { + stmt.setNull(index, java.sql.Types.INTEGER); + } + } } diff --git a/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java b/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java index ba1daa6e..0a5f5f6b 100644 --- a/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java +++ b/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java @@ -30,6 +30,8 @@ import dev.dbos.transact.workflow.ForkOptions; import dev.dbos.transact.workflow.ListWorkflowsInput; import dev.dbos.transact.workflow.Queue; +import dev.dbos.transact.workflow.QueueConflictResolution; +import dev.dbos.transact.workflow.QueueOptions; import dev.dbos.transact.workflow.ScheduleStatus; import dev.dbos.transact.workflow.SerializationStrategy; import dev.dbos.transact.workflow.StepInfo; @@ -392,14 +394,48 @@ public Optional getRegisteredWorkflow( return Optional.ofNullable(this.workflowMap.get(fqName)); } - public Collection getQueues() { + public Optional findQueue(String queueName) { + return findStaticQueue(queueName) + .or(() -> queueService.findDynamicQueue(queueName)) + .or(() -> systemDatabase.findQueue(queueName)); + } + + public Collection getStaticQueues() { return this.queueMap.values(); } - public Optional getQueue(String queueName) { + public Optional findStaticQueue(String queueName) { return Optional.ofNullable(this.queueMap.get(queueName)); } + public void registerDynamicQueue( + String name, QueueOptions options, QueueConflictResolution onConflict) { + boolean updateExisting = + switch (onConflict) { + case ALWAYS_UPDATE -> true; + case NEVER_UPDATE -> false; + case UPDATE_IF_LATEST_VERSION -> + appVersion.equals(systemDatabase.getLatestApplicationVersion().versionName()); + }; + systemDatabase.upsertQueue(name, options, updateExisting); + } + + public void updateDynamicQueue(String name, QueueOptions options) { + systemDatabase.updateQueue(name, options); + } + + public Optional findDynamicQueue(String name) { + return systemDatabase.findQueue(name); + } + + public boolean deleteDynamicQueue(String name) { + return systemDatabase.deleteQueue(name); + } + + public List listDynamicQueues() { + return systemDatabase.listQueues(); + } + public void fireAlertHandler(String name, String message, Map metadata) { if (alertHandler != null) { alertHandler.invoke(name, message, metadata); @@ -1424,7 +1460,7 @@ private void validateWorkflow(String workflowName, String className, String inst private void validateQueue(String queueName) { if (queueName != null) { - getQueue(queueName) + findQueue(queueName) .orElseThrow( () -> new IllegalStateException("Queue %s is not registered".formatted(queueName))); } @@ -1437,7 +1473,7 @@ private void validateQueue(String queueName, String queuePartitionKey) { "DBOS internal queue is not a partitioned queue, but a partition key was provided"); } } else { - var queue = this.getQueue(queueName); + var queue = findQueue(queueName); if (queue.isPresent()) { if (queue.get().partitioningEnabled() && queuePartitionKey == null) { throw new IllegalArgumentException( diff --git a/transact/src/main/java/dev/dbos/transact/execution/QueueService.java b/transact/src/main/java/dev/dbos/transact/execution/QueueService.java index 71eee21d..50547379 100644 --- a/transact/src/main/java/dev/dbos/transact/execution/QueueService.java +++ b/transact/src/main/java/dev/dbos/transact/execution/QueueService.java @@ -6,14 +6,18 @@ import java.time.Duration; import java.util.Collection; +import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -21,12 +25,17 @@ public class QueueService implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(QueueService.class); + private static final Duration MAX_POLLING_INTERVAL = Duration.ofSeconds(120); + private static final long DB_QUEUE_SUPERVISOR_INTERVAL_SEC = 1; private final AtomicReference execServiceRef = new AtomicReference<>(); private final AtomicBoolean paused = new AtomicBoolean(false); + private final Set dbListeningQueues = ConcurrentHashMap.newKeySet(); + private volatile Map dynamicQueueMap = Map.of(); private final SystemDatabase systemDatabase; private final DBOSExecutor dbosExecutor; + private Set listenQueues; private double speedup = 1.0; public QueueService(DBOSExecutor dbosExecutor, SystemDatabase systemDatabase) { @@ -46,12 +55,18 @@ public void unpause() { paused.set(false); } - public void start(Collection queues, Set listenQueues) { + public void start(Collection staticQueues, Set listenQueues) { if (this.execServiceRef.get() == null) { var procCount = Runtime.getRuntime().availableProcessors(); var scheduler = Executors.newScheduledThreadPool(procCount); if (this.execServiceRef.compareAndSet(null, scheduler)) { - startQueueListeners(queues, listenQueues); + this.listenQueues = listenQueues; + scheduler.scheduleAtFixedRate(this::transitionDelayedWorkflows, 1, 1, TimeUnit.SECONDS); + scheduler.scheduleAtFixedRate( + this::pollDynamicQueues, 0, DB_QUEUE_SUPERVISOR_INTERVAL_SEC, TimeUnit.SECONDS); + for (var queue : staticQueues) { + startQueueListenerIfNeeded(queue, false); + } } } } @@ -69,107 +84,144 @@ public boolean isStopped() { return this.execServiceRef.get() == null; } - private void startQueueListeners(Collection queues, Set listenQueues) { - logger.debug("startQueueListeners"); + public Optional findDynamicQueue(String queueName) { + return Optional.ofNullable(dynamicQueueMap.get(queueName)); + } + + private boolean isListening(String queueName) { + return queueName.equals(Constants.DBOS_INTERNAL_QUEUE) + || listenQueues.isEmpty() + || listenQueues.contains(queueName); + } + + private void startQueueListenerIfNeeded(Queue queue, boolean dynamic) { + if (!isListening(queue.name())) return; + if (dynamic && !dbListeningQueues.add(queue.name())) return; + if (execServiceRef.get() == null) return; + + new QueueListenerTask(queue, dynamic) + .schedule(); // executor holds the reference via the scheduled future + } + + // ── Dynamic queue supervisor ────────────────────────────────────────────── + + private void pollDynamicQueues() { + try { + if (execServiceRef.get() == null) return; + + var dbQueues = systemDatabase.listQueues(); + dynamicQueueMap = + dbQueues.stream().collect(Collectors.toUnmodifiableMap(Queue::name, q -> q)); + if (logger.isDebugEnabled()) { + logger.debug("pollDynamicQueues found {} queues", dbQueues.size()); + for (var q : dbQueues) { + logger.debug( + " queue: {} concurrency: {} pollingInterval: {}", + q.name(), + q.concurrency(), + q.pollingInterval()); + } + } + + for (var queue : dbQueues) { + if (dbosExecutor.findStaticQueue(queue.name()).isPresent()) { + logger.warn( + "Database-backed queue {} has the same name as a static queue; " + + "the static queue's configuration is being used and the database-backed queue is ignored.", + queue.name()); + continue; + } + startQueueListenerIfNeeded(queue, true); + } + } catch (Exception e) { + logger.error("pollDynamicQueues failed", e); + } + } + + // ── Queue listener task ─────────────────────────────────────────────────── + + private class QueueListenerTask implements Runnable { - final var executorId = dbosExecutor.executorId(); - final var appVersion = dbosExecutor.appVersion(); - final Duration minPollingInterval = Duration.ofSeconds(1); - final Duration maxPollingInterval = Duration.ofSeconds(120); + Queue queue; + double backoffFactor = 1.0; + final boolean dynamic; + final String executorId = dbosExecutor.executorId(); + final String appVersion = dbosExecutor.appVersion(); - var execService = execServiceRef.get(); - if (execService != null) { - execService.scheduleAtFixedRate(this::transitionDelayedWorkflows, 1, 1, TimeUnit.SECONDS); + QueueListenerTask(Queue queue, boolean dynamic) { + this.queue = queue; + this.dynamic = dynamic; } - for (var _queue : queues) { + void schedule() { + var randomSleepFactor = 0.95 + ThreadLocalRandom.current().nextDouble(0.1); + var delayMs = + (long) (randomSleepFactor * queue.pollingInterval().toMillis() * backoffFactor * speedup); + var svc = execServiceRef.get(); + if (svc != null) { + svc.schedule(this, delayMs, TimeUnit.MILLISECONDS); + } + } - var listening = - _queue.name().equals(Constants.DBOS_INTERNAL_QUEUE) - || listenQueues.isEmpty() - || listenQueues.contains(_queue.name()); - if (!listening) { - continue; + private void processPartition(String partition) { + var partitionLog = Objects.requireNonNullElse(partition, ""); + if (!paused.get()) { + var workflowIds = + systemDatabase.getAndStartQueuedWorkflows(queue, executorId, appVersion, partition); + if (!workflowIds.isEmpty()) { + logger.debug( + "Retrieved {} workflows from {} partition of queue {}", + workflowIds.size(), + partitionLog, + queue.name()); + } + for (var workflowId : workflowIds) { + logger.debug( + "Starting workflow {} from {} partition of queue {}", + workflowId, + partitionLog, + queue.name()); + dbosExecutor.executeWorkflowById(workflowId, false, true); + } } + } - var task = - new Runnable() { - final Queue queue = _queue; - Duration pollingInterval = Duration.ofSeconds(1); - - public void schedule() { - var randomSleepFactor = 0.95 + ThreadLocalRandom.current().nextDouble(0.1); - var delayMs = (long) (randomSleepFactor * pollingInterval.toMillis() * speedup); - var execService = execServiceRef.get(); - if (execService != null) { - execService.schedule(this, delayMs, TimeUnit.MILLISECONDS); - } - } - - private void processPartition(String partition) { - var partitionLog = Objects.requireNonNullElse(partition, ""); - if (!paused.get()) { - var workflowIds = - systemDatabase.getAndStartQueuedWorkflows( - queue, executorId, appVersion, partition); - if (workflowIds.size() > 0) { - logger.debug( - "Retrieved {} workflows from {} partition of queue {}", - workflowIds.size(), - partitionLog, - queue.name()); - } - for (var workflowId : workflowIds) { - logger.debug( - "Starting workflow {} from {} partition of queue {}", - workflowId, - partitionLog, - queue.name()); - dbosExecutor.executeWorkflowById(workflowId, false, true); - } - } - } - - @Override - public void run() { - // if scheduler service isn't running, the queue service was stopped so don't start - // the workflow or schedule the next execution - if (execServiceRef.get() == null) { - return; - } - - try { - if (queue.partitioningEnabled()) { - var partitions = systemDatabase.getQueuePartitions(queue.name()); - for (var partition : partitions) { - processPartition(partition); - } - } else { - processPartition(null); - } - - pollingInterval = Duration.ofMillis((long) (pollingInterval.toMillis() * 0.9)); - pollingInterval = - pollingInterval.compareTo(minPollingInterval) >= 0 - ? pollingInterval - : minPollingInterval; - } catch (Exception e) { - logger.error("Error executing queued workflow(s) for queue {}", queue.name(), e); - pollingInterval = pollingInterval.multipliedBy(2); - pollingInterval = - pollingInterval.compareTo(maxPollingInterval) <= 0 - ? pollingInterval - : maxPollingInterval; - } finally { - this.schedule(); - } - } - }; - - task.schedule(); + @Override + public void run() { + if (execServiceRef.get() == null) return; + if (dynamic) { + var refreshed = systemDatabase.findQueue(queue.name()); + if (refreshed.isEmpty()) { + dbListeningQueues.remove(queue.name()); + return; + } + queue = refreshed.get(); + } + + try { + if (queue.partitioningEnabled()) { + var partitions = systemDatabase.getQueuePartitions(queue.name()); + for (var partition : partitions) { + processPartition(partition); + } + } else { + processPartition(null); + } + + backoffFactor = Math.max(backoffFactor * 0.9, 1.0); + } catch (Exception e) { + logger.error("Error executing queued workflow(s) for queue {}", queue.name(), e); + double maxFactor = + (double) MAX_POLLING_INTERVAL.toMillis() / queue.pollingInterval().toMillis(); + backoffFactor = Math.min(backoffFactor * 2.0, maxFactor); + } finally { + this.schedule(); + } } } + // ── Shared helpers ──────────────────────────────────────────────────────── + private void transitionDelayedWorkflows() { if (!paused.get()) { try { diff --git a/transact/src/main/java/dev/dbos/transact/execution/SchedulerService.java b/transact/src/main/java/dev/dbos/transact/execution/SchedulerService.java index f290ac75..1461551b 100644 --- a/transact/src/main/java/dev/dbos/transact/execution/SchedulerService.java +++ b/transact/src/main/java/dev/dbos/transact/execution/SchedulerService.java @@ -85,170 +85,168 @@ public void unpause() { private void pollWorkflowSchedules() { try { - pollWorkflowSchedulesImpl(); - } catch (Exception e) { - // Catch all exceptions to prevent scheduleAtFixedRate from permanently suppressing future - // poll invocations. A transient DB failure should not permanently disable the scheduler. - logger.error("pollWorkflowSchedules failed", e); - } - } - - private void pollWorkflowSchedulesImpl() { - // if execServiceRef is null, the scheduler service was shut down so don't poll schedules - if (execServiceRef.get() == null) { - return; - } - - var schedules = dbosExecutor.listSchedules(null, null, null); - if (logger.isDebugEnabled()) { - logger.debug("pollWorkflowSchedules found {} schedules", schedules.size()); - for (var s : schedules) { - logger.debug( - " schedule: {} workflow: {} cron: {}", s.scheduleName(), s.workflowName(), s.cron()); + // if execServiceRef is null, the scheduler service was shut down so don't poll schedules + if (execServiceRef.get() == null) { + return; } - } - // shut down any scheduled future that isn't in the list of current schedules - var currentIds = schedules.stream().map(WorkflowSchedule::id).collect(Collectors.toSet()); - for (var key : workflowScheduleFutures.keySet()) { - if (!currentIds.contains(key)) { - cancelWorkflowSchedule(key); + var schedules = dbosExecutor.listSchedules(null, null, null); + if (logger.isDebugEnabled()) { + logger.debug("pollWorkflowSchedules found {} schedules", schedules.size()); + for (var s : schedules) { + logger.debug( + " schedule: {} workflow: {} cron: {}", s.scheduleName(), s.workflowName(), s.cron()); + } } - } - for (var schedule : schedules) { - var scheduleRunning = workflowScheduleFutures.containsKey(schedule.id()); - if (!schedule.isActive()) { - // if the schedule is no longer active but we still have a scheduled future for it, cancel - // it - if (scheduleRunning) { - cancelWorkflowSchedule(schedule.id()); - } - } else if (!scheduleRunning) { - // if the schedule is active but we don't yet have a scheduled future for it, schedule it - // now - var optRegWf = - dbosExecutor.getRegisteredWorkflow(schedule.workflowName(), schedule.className(), ""); - if (optRegWf.isEmpty()) { - logger.error( - "Workflow schedule {} has missing workflow function {}", - schedule.scheduleName(), - RegisteredWorkflow.fullyQualifiedName(schedule.workflowName(), schedule.className())); - continue; + // shut down any scheduled future that isn't in the list of current schedules + var currentIds = schedules.stream().map(WorkflowSchedule::id).collect(Collectors.toSet()); + for (var key : workflowScheduleFutures.keySet()) { + if (!currentIds.contains(key)) { + cancelWorkflowSchedule(key); } + } - var regWorkflow = optRegWf.orElseThrow(); - if (!Arrays.equals(regWorkflow.workflowMethod().getParameterTypes(), EXPECTED_PARAMETERS)) { - logger.error( - "Workflow schedule {} workflow {} has invalid signature, signature must be (Instant, Object)", - schedule.scheduleName(), - regWorkflow.fullyQualifiedName()); - continue; - } + for (var schedule : schedules) { + var scheduleRunning = workflowScheduleFutures.containsKey(schedule.id()); + if (!schedule.isActive()) { + // if the schedule is no longer active but we still have a scheduled future for it, cancel + // it + if (scheduleRunning) { + cancelWorkflowSchedule(schedule.id()); + } + } else if (!scheduleRunning) { + // if the schedule is active but we don't yet have a scheduled future for it, schedule it + // now + var optRegWf = + dbosExecutor.getRegisteredWorkflow(schedule.workflowName(), schedule.className(), ""); + if (optRegWf.isEmpty()) { + logger.error( + "Workflow schedule {} has missing workflow function {}", + schedule.scheduleName(), + RegisteredWorkflow.fullyQualifiedName( + schedule.workflowName(), schedule.className())); + continue; + } - final String queueName = - Objects.requireNonNullElse(schedule.queueName(), Constants.DBOS_INTERNAL_QUEUE); - if (dbosExecutor.getQueue(queueName).isEmpty()) { - logger.error( - "Workflow schedule {} has invalid queue {}", schedule.scheduleName(), queueName); - continue; - } + var regWorkflow = optRegWf.orElseThrow(); + if (!Arrays.equals( + regWorkflow.workflowMethod().getParameterTypes(), EXPECTED_PARAMETERS)) { + logger.error( + "Workflow schedule {} workflow {} has invalid signature, signature must be (Instant, Object)", + schedule.scheduleName(), + regWorkflow.fullyQualifiedName()); + continue; + } - Cron cron; - try { - cron = CRON_PARSER.parse(schedule.cron()).validate(); - } catch (Exception e) { - logger.error( - "Workflow schedule {} has invalid cron expression {}", - schedule.scheduleName(), - schedule.cron(), - e); - continue; - } + final String queueName = + Objects.requireNonNullElse(schedule.queueName(), Constants.DBOS_INTERNAL_QUEUE); + if (dbosExecutor.findQueue(queueName).isEmpty()) { + logger.error( + "Workflow schedule {} has invalid queue {}", schedule.scheduleName(), queueName); + continue; + } - if (schedule.automaticBackfill() - && schedule.lastFiredAt() != null - && schedule.lastFiredAt().isBefore(Instant.now())) { - dbosExecutor.backfillSchedule( - schedule.scheduleName(), schedule.lastFiredAt(), Instant.now()); - } + Cron cron; + try { + cron = CRON_PARSER.parse(schedule.cron()).validate(); + } catch (Exception e) { + logger.error( + "Workflow schedule {} has invalid cron expression {}", + schedule.scheduleName(), + schedule.cron(), + e); + continue; + } - var task = - new Runnable() { + if (schedule.automaticBackfill() + && schedule.lastFiredAt() != null + && schedule.lastFiredAt().isBefore(Instant.now())) { + dbosExecutor.backfillSchedule( + schedule.scheduleName(), schedule.lastFiredAt(), Instant.now()); + } - final ZoneId timeZone = - Objects.requireNonNullElseGet( - schedule.cronTimezone(), () -> ZoneId.systemDefault()); - final WorkflowSchedule wfSchedule = schedule; - final ExecutionTime executionTime = ExecutionTime.forCron(cron); + var task = + new Runnable() { - ZonedDateTime nextTime = ZonedDateTime.now(timeZone); + final ZoneId timeZone = + Objects.requireNonNullElseGet( + schedule.cronTimezone(), () -> ZoneId.systemDefault()); + final WorkflowSchedule wfSchedule = schedule; + final ExecutionTime executionTime = ExecutionTime.forCron(cron); - public void schedule() { - executionTime - .nextExecution(nextTime) - .ifPresent( - cronTime -> { - this.nextTime = cronTime.truncatedTo(ChronoUnit.SECONDS); - var prevFuture = - workflowScheduleFutures.put( - wfSchedule.id(), scheduleTask(this.nextTime, this)); - // prevFuture should be null or a scheduled task that already fired. - // cancel it anyway just to be sure - if (prevFuture != null) { - if (!prevFuture.isDone()) { - logger.debug( - "Previous scheduled task for {} has not yet completed", - wfSchedule.scheduleName()); - } - prevFuture.cancel(false); - } - }); - } + ZonedDateTime nextTime = ZonedDateTime.now(timeZone); - @Override - public void run() { - // if execServiceRef is null, the scheduler service was shut down so don't start the - // workflow or schedule the next execution - if (execServiceRef.get() == null) { - return; + public void schedule() { + executionTime + .nextExecution(nextTime) + .ifPresent( + cronTime -> { + this.nextTime = cronTime.truncatedTo(ChronoUnit.SECONDS); + var prevFuture = + workflowScheduleFutures.put( + wfSchedule.id(), scheduleTask(this.nextTime, this)); + // prevFuture should be null or a scheduled task that already fired. + // cancel it anyway just to be sure + if (prevFuture != null) { + if (!prevFuture.isDone()) { + logger.debug( + "Previous scheduled task for {} has not yet completed", + wfSchedule.scheduleName()); + } + prevFuture.cancel(false); + } + }); } - try { - if (paused.get()) { + @Override + public void run() { + // if execServiceRef is null, the scheduler service was shut down so don't start + // the workflow or schedule the next execution + if (execServiceRef.get() == null) { + return; + } + + try { + if (paused.get()) { + logger.debug( + "Skipping scheduled workflow {} schedule {} because scheduler is paused", + regWorkflow.fullyQualifiedName(), + wfSchedule.scheduleName()); + return; + } + var args = new Object[] {nextTime.toInstant(), wfSchedule.context()}; + var workflowId = + "sched-%s-%s" + .formatted(wfSchedule.scheduleName(), nextTime.toOffsetDateTime()); logger.debug( - "Skipping scheduled workflow {} schedule {} because scheduler is paused", + "Queuing scheduled workflow {} schedule {} workflowId {}", regWorkflow.fullyQualifiedName(), - wfSchedule.scheduleName()); - return; + wfSchedule.scheduleName(), + workflowId); + var appVersion = dbosExecutor.getLatestApplicationVersion().versionName(); + var options = + new StartWorkflowOptions(workflowId) + .withQueue(queueName) + .withAppVersion(appVersion); + dbosExecutor.startRegisteredWorkflow(regWorkflow, args, options); + systemDatabase.updateScheduleLastFiredAt( + wfSchedule.scheduleName(), nextTime.toInstant()); + } catch (Exception e) { + logger.error("Scheduled task {} exception", schedule.scheduleName(), e); + } finally { + schedule(); } - var args = new Object[] {nextTime.toInstant(), wfSchedule.context()}; - var workflowId = - "sched-%s-%s" - .formatted(wfSchedule.scheduleName(), nextTime.toOffsetDateTime()); - logger.debug( - "Queuing scheduled workflow {} schedule {} workflowId {}", - regWorkflow.fullyQualifiedName(), - wfSchedule.scheduleName(), - workflowId); - var appVersion = dbosExecutor.getLatestApplicationVersion().versionName(); - var options = - new StartWorkflowOptions(workflowId) - .withQueue(queueName) - .withAppVersion(appVersion); - dbosExecutor.startRegisteredWorkflow(regWorkflow, args, options); - systemDatabase.updateScheduleLastFiredAt( - wfSchedule.scheduleName(), nextTime.toInstant()); - } catch (Exception e) { - logger.error("Scheduled task {} exception", schedule.scheduleName(), e); - } finally { - schedule(); } - } - }; + }; - task.schedule(); + task.schedule(); + } } + } catch (Exception e) { + // Catch all exceptions to prevent scheduleAtFixedRate from permanently suppressing future + // poll invocations. A transient DB failure should not permanently disable the scheduler. + logger.error("pollWorkflowSchedules failed", e); } } diff --git a/transact/src/main/java/dev/dbos/transact/workflow/Field.java b/transact/src/main/java/dev/dbos/transact/workflow/Field.java new file mode 100644 index 00000000..d7f75c84 --- /dev/null +++ b/transact/src/main/java/dev/dbos/transact/workflow/Field.java @@ -0,0 +1,31 @@ +package dev.dbos.transact.workflow; + +import org.jspecify.annotations.Nullable; + +/** + * Three-state field for partial updates: absent (don't touch), present with a value, or present + * with null (clear the column). + */ +public sealed interface Field permits Field.Absent, Field.Present { + + record Absent() implements Field {} + + record Present(@Nullable T value) implements Field {} + + static Field absent() { + return new Absent<>(); + } + + static Field of(@Nullable T value) { + return new Present<>(value); + } + + default boolean isPresent() { + return this instanceof Present; + } + + default @Nullable T get() { + if (this instanceof Present p) return p.value(); + throw new IllegalStateException("Field.get() called on an absent field"); + } +} diff --git a/transact/src/main/java/dev/dbos/transact/workflow/Queue.java b/transact/src/main/java/dev/dbos/transact/workflow/Queue.java index 46ab31f8..2ce4b37d 100644 --- a/transact/src/main/java/dev/dbos/transact/workflow/Queue.java +++ b/transact/src/main/java/dev/dbos/transact/workflow/Queue.java @@ -2,35 +2,45 @@ import java.time.Duration; import java.util.Objects; +import java.util.concurrent.TimeUnit; + +import org.jspecify.annotations.NonNull; +import org.jspecify.annotations.Nullable; /** * Property definition for a DBOS workflow queue. Provides options for a name, concurrency and rate * limits, prioritization behavior and partitioned behavior */ public record Queue( - String name, - Integer concurrency, - Integer workerConcurrency, + @NonNull String name, + @Nullable Integer concurrency, + @Nullable Integer workerConcurrency, boolean priorityEnabled, boolean partitioningEnabled, - RateLimit rateLimit) { + @Nullable RateLimit rateLimit, + @NonNull Duration pollingInterval) { + + public static final Duration DEFAULT_POLLING_INTERVAL = Duration.ofSeconds(1); /** Rate limit parameter structure for DBOS workflow queues */ public record RateLimit(int limit, Duration period) {} public Queue { Objects.requireNonNull(name, "Queue name must not be null"); + Objects.requireNonNull(pollingInterval, "Queue pollingInterval must not be null"); if (concurrency != null && concurrency <= 0) throw new IllegalArgumentException( "If specified, queue concurrency must be greater than zero"); if (workerConcurrency != null && workerConcurrency <= 0) throw new IllegalArgumentException( "If specified, queue workerConcurrency must be greater than zero"); + if (pollingInterval.isNegative() || pollingInterval.isZero()) + throw new IllegalArgumentException("Queue pollingInterval must be greater than zero"); } /** Construct a queue with a given name */ - public Queue(String name) { - this(name, null, null, false, false, null); + public Queue(@NonNull String name) { + this(name, null, null, false, false, null, DEFAULT_POLLING_INTERVAL); } /** @@ -41,48 +51,84 @@ public boolean hasLimiter() { } /** Produces a new Queue with the assigned name. */ - public Queue withName(String name) { + public Queue withName(@NonNull String name) { return new Queue( - name, concurrency, workerConcurrency, priorityEnabled, partitioningEnabled, rateLimit); + name, + concurrency, + workerConcurrency, + priorityEnabled, + partitioningEnabled, + rateLimit, + pollingInterval); } /** * Produces a new Queue with the assigned global concurrency. `null` may be specified to remove * the concurrency limit. */ - public Queue withConcurrency(Integer concurrency) { + public Queue withConcurrency(@Nullable Integer concurrency) { return new Queue( - name, concurrency, workerConcurrency, priorityEnabled, partitioningEnabled, rateLimit); + name, + concurrency, + workerConcurrency, + priorityEnabled, + partitioningEnabled, + rateLimit, + pollingInterval); } /** * Produces a new Queue with the assigned per-worker concurrency. `null` may be specified to * remove the concurrency limit. */ - public Queue withWorkerConcurrency(Integer workerConcurrency) { + public Queue withWorkerConcurrency(@Nullable Integer workerConcurrency) { return new Queue( - name, concurrency, workerConcurrency, priorityEnabled, partitioningEnabled, rateLimit); + name, + concurrency, + workerConcurrency, + priorityEnabled, + partitioningEnabled, + rateLimit, + pollingInterval); } /** Produces a new Queue with the prioritization enabled/disabled. */ public Queue withPriorityEnabled(boolean priorityEnabled) { return new Queue( - name, concurrency, workerConcurrency, priorityEnabled, partitioningEnabled, rateLimit); + name, + concurrency, + workerConcurrency, + priorityEnabled, + partitioningEnabled, + rateLimit, + pollingInterval); } /** Produces a new Queue with the partitioned enabled/disabled. */ public Queue withPartitioningEnabled(boolean partitioningEnabled) { return new Queue( - name, concurrency, workerConcurrency, priorityEnabled, partitioningEnabled, rateLimit); + name, + concurrency, + workerConcurrency, + priorityEnabled, + partitioningEnabled, + rateLimit, + pollingInterval); } /** * Produces a new Queue with the assigned rate limit. `null` may be specified to remove the rate * limit. */ - public Queue withRateLimit(RateLimit rateLimit) { + public Queue withRateLimit(@Nullable RateLimit rateLimit) { return new Queue( - name, concurrency, workerConcurrency, priorityEnabled, partitioningEnabled, rateLimit); + name, + concurrency, + workerConcurrency, + priorityEnabled, + partitioningEnabled, + rateLimit, + pollingInterval); } /** @@ -92,10 +138,20 @@ public Queue withRateLimit(int limit, Duration period) { return withRateLimit(new RateLimit(limit, period)); } - /** - * Produces a new Queue with the assigned rate limit, expressed in workflows per period (seconds). - */ - public Queue withRateLimit(int limit, double period) { - return withRateLimit(new RateLimit(limit, Duration.ofMillis((long) (period * 1000)))); + /** Produces a new Queue with the assigned rate limit, expressed in workflows per period. */ + public Queue withRateLimit(int limit, long period, TimeUnit unit) { + return withRateLimit(new RateLimit(limit, Duration.of(period, unit.toChronoUnit()))); + } + + /** Produces a new Queue with the assigned polling interval. */ + public Queue withPollingInterval(@NonNull Duration pollingInterval) { + return new Queue( + name, + concurrency, + workerConcurrency, + priorityEnabled, + partitioningEnabled, + rateLimit, + pollingInterval); } } diff --git a/transact/src/main/java/dev/dbos/transact/workflow/QueueConflictResolution.java b/transact/src/main/java/dev/dbos/transact/workflow/QueueConflictResolution.java new file mode 100644 index 00000000..5aee7d5a --- /dev/null +++ b/transact/src/main/java/dev/dbos/transact/workflow/QueueConflictResolution.java @@ -0,0 +1,23 @@ +package dev.dbos.transact.workflow; + +/** + * Controls what happens when {@code registerQueue} is called for a queue that already exists in the + * database. + */ +public enum QueueConflictResolution { + /** + * Overwrite the existing queue configuration unconditionally. Default for {@link + * dev.dbos.transact.DBOSClient}. + */ + ALWAYS_UPDATE, + + /** Leave the existing queue configuration unchanged. */ + NEVER_UPDATE, + + /** + * Overwrite the existing queue configuration only if the running application version matches the + * latest application version registered in the database. Default for {@link + * dev.dbos.transact.DBOS}. + */ + UPDATE_IF_LATEST_VERSION, +} diff --git a/transact/src/main/java/dev/dbos/transact/workflow/QueueOptions.java b/transact/src/main/java/dev/dbos/transact/workflow/QueueOptions.java new file mode 100644 index 00000000..1a679cff --- /dev/null +++ b/transact/src/main/java/dev/dbos/transact/workflow/QueueOptions.java @@ -0,0 +1,192 @@ +package dev.dbos.transact.workflow; + +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import org.jspecify.annotations.Nullable; + +/** + * Configuration options for a DBOS workflow queue. Used for both registration of database-backed + * queues and partial updates to queue configuration. + * + *

When used for registration, absent and null fields both result in the column being null (no + * limit / use default). When used for updates, absent fields are left unchanged in the database + * while null-valued fields clear the column. + * + *

The non-nullable queue properties ({@code priorityEnabled}, {@code partitionQueue}, {@code + * pollingInterval}) use {@link Optional} — {@link Optional#empty()} means use the default on + * creation or leave unchanged on update; a present value sets the column. + */ +public record QueueOptions( + Field concurrency, + Field workerConcurrency, + Field rateLimitMax, + Field rateLimitPeriod, + Optional priorityEnabled, + Optional partitionQueue, + Optional pollingInterval) { + + private static final QueueOptions EMPTY = + new QueueOptions( + Field.absent(), + Field.absent(), + Field.absent(), + Field.absent(), + Optional.empty(), + Optional.empty(), + Optional.empty()); + + public static QueueOptions empty() { + return EMPTY; + } + + public boolean isEmpty() { + return !concurrency.isPresent() + && !workerConcurrency.isPresent() + && !rateLimitMax.isPresent() + && !rateLimitPeriod.isPresent() + && priorityEnabled.isEmpty() + && partitionQueue.isEmpty() + && pollingInterval.isEmpty(); + } + + // ── Static factories ────────────────────────────────────────────────────── + + public static QueueOptions setConcurrency(@Nullable Integer value) { + return EMPTY.withConcurrency(Field.of(value)); + } + + public static QueueOptions setWorkerConcurrency(@Nullable Integer value) { + return EMPTY.withWorkerConcurrency(Field.of(value)); + } + + public static QueueOptions setRateLimit(@Nullable Integer max, @Nullable Duration period) { + return EMPTY.withRateLimitMax(Field.of(max)).withRateLimitPeriod(Field.of(period)); + } + + public static QueueOptions setRateLimit(int limit, long period, TimeUnit unit) { + return setRateLimit(limit, Duration.of(period, unit.toChronoUnit())); + } + + public static QueueOptions setPriorityEnabled(boolean value) { + return EMPTY.withPriorityEnabled(Optional.of(value)); + } + + public static QueueOptions setPartitionQueue(boolean value) { + return EMPTY.withPartitionQueue(Optional.of(value)); + } + + public static QueueOptions setPollingInterval(Duration value) { + return EMPTY.withPollingInterval(Optional.of(value)); + } + + // ── Builders for chaining ───────────────────────────────────────────────── + + public QueueOptions withConcurrency(Field concurrency) { + return new QueueOptions( + concurrency, + workerConcurrency, + rateLimitMax, + rateLimitPeriod, + priorityEnabled, + partitionQueue, + pollingInterval); + } + + public QueueOptions withWorkerConcurrency(Field workerConcurrency) { + return new QueueOptions( + concurrency, + workerConcurrency, + rateLimitMax, + rateLimitPeriod, + priorityEnabled, + partitionQueue, + pollingInterval); + } + + public QueueOptions withRateLimitMax(Field rateLimitMax) { + return new QueueOptions( + concurrency, + workerConcurrency, + rateLimitMax, + rateLimitPeriod, + priorityEnabled, + partitionQueue, + pollingInterval); + } + + public QueueOptions withRateLimitPeriod(Field rateLimitPeriod) { + return new QueueOptions( + concurrency, + workerConcurrency, + rateLimitMax, + rateLimitPeriod, + priorityEnabled, + partitionQueue, + pollingInterval); + } + + public QueueOptions withPriorityEnabled(Optional priorityEnabled) { + return new QueueOptions( + concurrency, + workerConcurrency, + rateLimitMax, + rateLimitPeriod, + priorityEnabled, + partitionQueue, + pollingInterval); + } + + public QueueOptions withPartitionQueue(Optional partitionQueue) { + return new QueueOptions( + concurrency, + workerConcurrency, + rateLimitMax, + rateLimitPeriod, + priorityEnabled, + partitionQueue, + pollingInterval); + } + + public QueueOptions withPollingInterval(Optional pollingInterval) { + return new QueueOptions( + concurrency, + workerConcurrency, + rateLimitMax, + rateLimitPeriod, + priorityEnabled, + partitionQueue, + pollingInterval); + } + + // ── Convenience chaining methods ────────────────────────────────────────── + + public QueueOptions andConcurrency(@Nullable Integer value) { + return withConcurrency(Field.of(value)); + } + + public QueueOptions andWorkerConcurrency(@Nullable Integer value) { + return withWorkerConcurrency(Field.of(value)); + } + + public QueueOptions andRateLimit(@Nullable Integer max, @Nullable Duration period) { + return withRateLimitMax(Field.of(max)).withRateLimitPeriod(Field.of(period)); + } + + public QueueOptions andRateLimit(int max, long period, TimeUnit unit) { + return andRateLimit(max, Duration.of(period, unit.toChronoUnit())); + } + + public QueueOptions andPriorityEnabled(boolean value) { + return withPriorityEnabled(Optional.of(value)); + } + + public QueueOptions andPartitionQueue(boolean value) { + return withPartitionQueue(Optional.of(value)); + } + + public QueueOptions andPollingInterval(Duration value) { + return withPollingInterval(Optional.of(value)); + } +} diff --git a/transact/src/test/java/dev/dbos/transact/admin/AdminServerTest.java b/transact/src/test/java/dev/dbos/transact/admin/AdminServerTest.java index bdfcf0f4..9c8d916d 100644 --- a/transact/src/test/java/dev/dbos/transact/admin/AdminServerTest.java +++ b/transact/src/test/java/dev/dbos/transact/admin/AdminServerTest.java @@ -32,6 +32,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import io.restassured.response.Response; import org.junit.jupiter.api.BeforeEach; @@ -106,7 +107,7 @@ public void ensurePostJsonNotJson() throws IOException { @Test public void exceptionCatching500() throws IOException { var exception = new RuntimeException("test-exception"); - when(mockExec.getQueues()).thenThrow(exception); + when(mockExec.getStaticQueues()).thenThrow(exception); try (var server = new AdminServer(port, mockExec, mockDB)) { server.start(); @@ -197,9 +198,9 @@ public void queueMetadata() throws IOException { .withConcurrency(10) .withWorkerConcurrency(5) .withPriorityEnabled(true) - .withRateLimit(2, 4.0); + .withRateLimit(2, 4, TimeUnit.SECONDS); - when(mockExec.getQueues()).thenReturn(List.of(queue1, queue2)); + when(mockExec.getStaticQueues()).thenReturn(List.of(queue1, queue2)); try (var server = new AdminServer(port, mockExec, mockDB)) { server.start(); diff --git a/transact/src/test/java/dev/dbos/transact/client/ClientQueueTest.java b/transact/src/test/java/dev/dbos/transact/client/ClientQueueTest.java new file mode 100644 index 00000000..2487d32a --- /dev/null +++ b/transact/src/test/java/dev/dbos/transact/client/ClientQueueTest.java @@ -0,0 +1,114 @@ +package dev.dbos.transact.client; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import dev.dbos.transact.utils.PgContainer; +import dev.dbos.transact.workflow.Queue; +import dev.dbos.transact.workflow.QueueConflictResolution; +import dev.dbos.transact.workflow.QueueOptions; + +import org.junit.jupiter.api.AutoClose; +import org.junit.jupiter.api.Test; + +public class ClientQueueTest { + + @AutoClose final PgContainer pgContainer = new PgContainer(); + + @Test + public void testClientRegisterAndFindQueue() { + try (var client = pgContainer.dbosClient()) { + client.registerQueue("cq-find", QueueOptions.setConcurrency(7)); + + var q = client.findQueue("cq-find").orElseThrow(); + assertEquals("cq-find", q.name()); + assertEquals(7, q.concurrency()); + + assertTrue(client.findQueue("cq-does-not-exist").isEmpty()); + } + } + + @Test + public void testClientListQueues() { + try (var client = pgContainer.dbosClient()) { + client.registerQueue("cq-list-1", QueueOptions.setConcurrency(1)); + client.registerQueue("cq-list-2", QueueOptions.setConcurrency(2)); + client.registerQueue("cq-list-3", QueueOptions.empty()); + + var names = client.listQueues().stream().map(Queue::name).toList(); + assertTrue(names.contains("cq-list-1")); + assertTrue(names.contains("cq-list-2")); + assertTrue(names.contains("cq-list-3")); + } + } + + @Test + public void testClientDeleteQueue() { + try (var client = pgContainer.dbosClient()) { + client.registerQueue("cq-del", QueueOptions.empty()); + assertTrue(client.findQueue("cq-del").isPresent()); + + assertTrue(client.deleteQueue("cq-del")); + assertTrue(client.findQueue("cq-del").isEmpty()); + + assertFalse(client.deleteQueue("cq-never-existed")); + } + } + + @Test + public void testClientUpdateQueue() { + try (var client = pgContainer.dbosClient()) { + client.registerQueue("cq-update", QueueOptions.setConcurrency(5)); + assertEquals(5, client.findQueue("cq-update").orElseThrow().concurrency()); + + client.updateQueue("cq-update", QueueOptions.setConcurrency(10)); + assertEquals(10, client.findQueue("cq-update").orElseThrow().concurrency()); + } + } + + @Test + public void testClientNeverUpdate() { + try (var client = pgContainer.dbosClient()) { + client.registerQueue("cq-never", QueueOptions.setConcurrency(5)); + client.registerQueue( + "cq-never", QueueOptions.setConcurrency(99), QueueConflictResolution.NEVER_UPDATE); + + assertEquals(5, client.findQueue("cq-never").orElseThrow().concurrency()); + } + } + + @Test + public void testClientAlwaysUpdate() { + try (var client = pgContainer.dbosClient()) { + client.registerQueue("cq-always", QueueOptions.setConcurrency(5)); + client.registerQueue( + "cq-always", QueueOptions.setConcurrency(99), QueueConflictResolution.ALWAYS_UPDATE); + + assertEquals(99, client.findQueue("cq-always").orElseThrow().concurrency()); + } + } + + @Test + public void testClientRejectsUpdateIfLatestVersion() { + try (var client = pgContainer.dbosClient()) { + assertThrows( + IllegalArgumentException.class, + () -> + client.registerQueue( + "cq-version", + QueueOptions.empty(), + QueueConflictResolution.UPDATE_IF_LATEST_VERSION)); + } + } + + @Test + public void testClientRejectsInternalQueueName() { + try (var client = pgContainer.dbosClient()) { + assertThrows( + IllegalArgumentException.class, + () -> client.registerQueue("_dbos_internal_queue", QueueOptions.empty())); + } + } +} diff --git a/transact/src/test/java/dev/dbos/transact/conductor/ConductorTest.java b/transact/src/test/java/dev/dbos/transact/conductor/ConductorTest.java index 937d7547..3c5dcf17 100644 --- a/transact/src/test/java/dev/dbos/transact/conductor/ConductorTest.java +++ b/transact/src/test/java/dev/dbos/transact/conductor/ConductorTest.java @@ -3265,4 +3265,136 @@ public void canGetWorkflowStreamsThrows() throws Exception { assertEquals(0, json.get("streams").size()); } } + + @RetryingTest(3) + public void canListQueues() throws Exception { + MessageListener listener = new MessageListener(); + testServer.setListener(listener); + + dev.dbos.transact.workflow.Queue q1 = + new dev.dbos.transact.workflow.Queue("queue-1") + .withConcurrency(5) + .withWorkerConcurrency(2) + .withRateLimit(10, Duration.ofSeconds(60)) + .withPriorityEnabled(true) + .withPartitioningEnabled(true) + .withPollingInterval(Duration.ofMillis(500)); + dev.dbos.transact.workflow.Queue q2 = new dev.dbos.transact.workflow.Queue("queue-2"); + when(mockDB.listQueues()).thenReturn(List.of(q1, q2)); + + try (Conductor conductor = builder.build()) { + conductor.start(); + assertTrue(listener.openLatch.await(5, TimeUnit.SECONDS), "open latch timed out"); + + listener.send(MessageType.LIST_QUEUES, "req-list-queues", Map.of()); + assertTrue(listener.messageLatch.await(1, TimeUnit.SECONDS), "message latch timed out"); + + verify(mockDB).listQueues(); + + JsonNode json = mapper.readTree(listener.message); + assertEquals("list_queues", json.get("type").asText()); + assertEquals("req-list-queues", json.get("request_id").asText()); + assertNull(json.get("error_message")); + + JsonNode output = json.get("output"); + assertNotNull(output); + assertTrue(output.isArray()); + assertEquals(2, output.size()); + + JsonNode first = output.get(0); + assertEquals("queue-1", first.get("name").asText()); + assertEquals(5, first.get("concurrency").asInt()); + assertEquals(2, first.get("worker_concurrency").asInt()); + assertEquals(10, first.get("rate_limit_max").asInt()); + assertEquals(60.0, first.get("rate_limit_period_sec").asDouble(), 0.001); + assertTrue(first.get("priority_enabled").asBoolean()); + assertTrue(first.get("partition_queue").asBoolean()); + assertEquals(0.5, first.get("polling_interval_sec").asDouble(), 0.001); + + JsonNode second = output.get(1); + assertEquals("queue-2", second.get("name").asText()); + assertTrue(second.get("concurrency").isNull()); + assertTrue(second.get("worker_concurrency").isNull()); + assertTrue(second.get("rate_limit_max").isNull()); + assertTrue(second.get("rate_limit_period_sec").isNull()); + assertFalse(second.get("priority_enabled").asBoolean()); + assertFalse(second.get("partition_queue").asBoolean()); + assertEquals(1.0, second.get("polling_interval_sec").asDouble(), 0.001); + } + } + + @RetryingTest(3) + public void canListQueuesThrows() throws Exception { + MessageListener listener = new MessageListener(); + testServer.setListener(listener); + + String errorMessage = "canListQueuesThrows error"; + doThrow(new RuntimeException(errorMessage)).when(mockDB).listQueues(); + + try (Conductor conductor = builder.build()) { + conductor.start(); + assertTrue(listener.openLatch.await(5, TimeUnit.SECONDS), "open latch timed out"); + + listener.send(MessageType.LIST_QUEUES, "req-list-queues-err", Map.of()); + assertTrue(listener.messageLatch.await(1, TimeUnit.SECONDS), "message latch timed out"); + + JsonNode json = mapper.readTree(listener.message); + assertEquals("list_queues", json.get("type").asText()); + assertEquals(errorMessage, json.get("error_message").asText()); + assertEquals(0, json.get("output").size()); + } + } + + @RetryingTest(3) + public void canGetQueue() throws Exception { + MessageListener listener = new MessageListener(); + testServer.setListener(listener); + + dev.dbos.transact.workflow.Queue queue = + new dev.dbos.transact.workflow.Queue("my-queue").withConcurrency(3); + when(mockDB.findQueue("my-queue")).thenReturn(Optional.of(queue)); + + try (Conductor conductor = builder.build()) { + conductor.start(); + assertTrue(listener.openLatch.await(5, TimeUnit.SECONDS), "open latch timed out"); + + listener.send(MessageType.GET_QUEUE, "req-get-queue", Map.of("name", "my-queue")); + assertTrue(listener.messageLatch.await(1, TimeUnit.SECONDS), "message latch timed out"); + + verify(mockDB).findQueue("my-queue"); + + JsonNode json = mapper.readTree(listener.message); + assertEquals("get_queue", json.get("type").asText()); + assertEquals("req-get-queue", json.get("request_id").asText()); + assertNull(json.get("error_message")); + + JsonNode output = json.get("output"); + assertNotNull(output); + assertEquals("my-queue", output.get("name").asText()); + assertEquals(3, output.get("concurrency").asInt()); + assertTrue(output.get("worker_concurrency").isNull()); + } + } + + @RetryingTest(3) + public void canGetQueueNotFound() throws Exception { + MessageListener listener = new MessageListener(); + testServer.setListener(listener); + + when(mockDB.findQueue("nonexistent")).thenReturn(Optional.empty()); + + try (Conductor conductor = builder.build()) { + conductor.start(); + assertTrue(listener.openLatch.await(5, TimeUnit.SECONDS), "open latch timed out"); + + listener.send(MessageType.GET_QUEUE, "req-get-queue-404", Map.of("name", "nonexistent")); + assertTrue(listener.messageLatch.await(1, TimeUnit.SECONDS), "message latch timed out"); + + JsonNode json = mapper.readTree(listener.message); + assertEquals("get_queue", json.get("type").asText()); + assertEquals("req-get-queue-404", json.get("request_id").asText()); + assertNull(json.get("error_message")); + assertTrue(!json.has("output") || json.get("output").isNull()); + } + } } diff --git a/transact/src/test/java/dev/dbos/transact/database/SystemDatabaseTest.java b/transact/src/test/java/dev/dbos/transact/database/SystemDatabaseTest.java index 73396c7c..0a737276 100644 --- a/transact/src/test/java/dev/dbos/transact/database/SystemDatabaseTest.java +++ b/transact/src/test/java/dev/dbos/transact/database/SystemDatabaseTest.java @@ -23,6 +23,8 @@ import dev.dbos.transact.workflow.ExportedWorkflow; import dev.dbos.transact.workflow.ForkOptions; import dev.dbos.transact.workflow.GetWorkflowAggregatesInput; +import dev.dbos.transact.workflow.Queue; +import dev.dbos.transact.workflow.QueueOptions; import dev.dbos.transact.workflow.ScheduleStatus; import dev.dbos.transact.workflow.VersionInfo; import dev.dbos.transact.workflow.WorkflowDelay; @@ -1566,4 +1568,165 @@ public void testGetAllNotificationsEmpty() throws Exception { var notifications = sysdb.getAllNotifications(wfId); assertTrue(notifications.isEmpty()); } + + // --- Queue CRUD tests --- + + @Test + public void testUpsertQueueInsert() { + var options = + QueueOptions.setConcurrency(5) + .andWorkerConcurrency(2) + .andPriorityEnabled(true) + .andRateLimit(10, 60, java.util.concurrent.TimeUnit.SECONDS); + + boolean inserted = sysdb.upsertQueue("q-insert", options, true); + assertTrue(inserted, "upsertQueue should return true when the row is new"); + + var fetched = sysdb.findQueue("q-insert"); + assertTrue(fetched.isPresent()); + var q = fetched.get(); + assertEquals("q-insert", q.name()); + assertEquals(5, q.concurrency()); + assertEquals(2, q.workerConcurrency()); + assertTrue(q.priorityEnabled()); + assertNotNull(q.rateLimit()); + assertEquals(10, q.rateLimit().limit()); + assertEquals(Duration.ofSeconds(60), q.rateLimit().period()); + } + + @Test + public void testUpsertQueueOptionsExisting() { + sysdb.upsertQueue("q-update", QueueOptions.setConcurrency(3), true); + + boolean inserted = + sysdb.upsertQueue("q-update", QueueOptions.setConcurrency(7).andWorkerConcurrency(4), true); + assertFalse(inserted, "upsertQueue should return false when the row already existed"); + + var fetched = sysdb.findQueue("q-update").orElseThrow(); + assertEquals(7, fetched.concurrency()); + assertEquals(4, fetched.workerConcurrency()); + } + + @Test + public void testUpsertQueueNoUpdateExisting() { + sysdb.upsertQueue("q-no-update", QueueOptions.setConcurrency(3), true); + + boolean inserted = sysdb.upsertQueue("q-no-update", QueueOptions.setConcurrency(99), false); + assertFalse(inserted, "upsertQueue should return false when the row already existed"); + + var fetched = sysdb.findQueue("q-no-update").orElseThrow(); + assertEquals( + 3, fetched.concurrency(), "concurrency should be unchanged when updateExisting=false"); + } + + @Test + public void testGetQueueFromDBMissing() { + var result = sysdb.findQueue("does-not-exist"); + assertTrue(result.isEmpty()); + } + + @Test + public void testListQueuesFromDB() { + sysdb.upsertQueue("q-list-a", QueueOptions.setConcurrency(1), true); + sysdb.upsertQueue("q-list-b", QueueOptions.setConcurrency(2), true); + sysdb.upsertQueue("q-list-c", QueueOptions.empty(), true); + + var queues = sysdb.listQueues(); + var names = queues.stream().map(Queue::name).toList(); + assertTrue(names.contains("q-list-a")); + assertTrue(names.contains("q-list-b")); + assertTrue(names.contains("q-list-c")); + } + + @Test + public void testDeleteQueue() { + sysdb.upsertQueue("q-delete", QueueOptions.setConcurrency(1), true); + assertTrue(sysdb.findQueue("q-delete").isPresent()); + + boolean deleted = sysdb.deleteQueue("q-delete"); + assertTrue(deleted); + assertTrue(sysdb.findQueue("q-delete").isEmpty()); + } + + @Test + public void testDeleteQueueMissing() { + assertFalse(sysdb.deleteQueue("q-never-existed")); + } + + @Test + public void testUpdateQueuePartialConcurrency() { + sysdb.upsertQueue( + "q-partial", + QueueOptions.setConcurrency(5) + .andPriorityEnabled(true) + .andRateLimit(10, 60, java.util.concurrent.TimeUnit.SECONDS), + true); + + sysdb.updateQueue("q-partial", QueueOptions.setConcurrency(99)); + + var q = sysdb.findQueue("q-partial").orElseThrow(); + assertEquals(99, q.concurrency(), "concurrency should be updated"); + assertTrue(q.priorityEnabled(), "priorityEnabled should be unchanged"); + assertNotNull(q.rateLimit(), "rateLimit should be unchanged"); + assertEquals(10, q.rateLimit().limit()); + } + + @Test + public void testUpdateQueueClearConcurrency() { + sysdb.upsertQueue("q-clear-conc", QueueOptions.setConcurrency(5), true); + + sysdb.updateQueue("q-clear-conc", QueueOptions.setConcurrency(null)); + + var q = sysdb.findQueue("q-clear-conc").orElseThrow(); + assertNull(q.concurrency(), "concurrency should be cleared to null"); + } + + @Test + public void testUpdateQueueClearRateLimit() { + sysdb.upsertQueue( + "q-clear-rate", + QueueOptions.setRateLimit(5, 30, java.util.concurrent.TimeUnit.SECONDS), + true); + + sysdb.updateQueue("q-clear-rate", QueueOptions.setRateLimit(null, null)); + + var q = sysdb.findQueue("q-clear-rate").orElseThrow(); + assertNull(q.rateLimit(), "rateLimit should be cleared to null"); + } + + @Test + public void testUpdateQueueEmpty() { + sysdb.upsertQueue("q-empty-update", QueueOptions.setConcurrency(5), true); + + // Empty update should be a no-op (no exception, no change) + var emptyUpdate = QueueOptions.empty(); + sysdb.updateQueue("q-empty-update", emptyUpdate); + + var q = sysdb.findQueue("q-empty-update").orElseThrow(); + assertEquals(5, q.concurrency()); + } + + @Test + public void testUpsertQueueRoundTrip() { + sysdb.upsertQueue( + "q-roundtrip", + QueueOptions.setConcurrency(8) + .andWorkerConcurrency(4) + .andPriorityEnabled(true) + .andPartitionQueue(true) + .andRateLimit(20, 30, java.util.concurrent.TimeUnit.SECONDS) + .andPollingInterval(Duration.ofSeconds(5)), + true); + var fetched = sysdb.findQueue("q-roundtrip").orElseThrow(); + + assertEquals("q-roundtrip", fetched.name()); + assertEquals(8, fetched.concurrency()); + assertEquals(4, fetched.workerConcurrency()); + assertTrue(fetched.priorityEnabled()); + assertTrue(fetched.partitioningEnabled()); + assertNotNull(fetched.rateLimit()); + assertEquals(20, fetched.rateLimit().limit()); + assertEquals(Duration.ofSeconds(30), fetched.rateLimit().period()); + assertEquals(Duration.ofSeconds(5), fetched.pollingInterval()); + } } diff --git a/transact/src/test/java/dev/dbos/transact/queue/DynamicQueuesTest.java b/transact/src/test/java/dev/dbos/transact/queue/DynamicQueuesTest.java new file mode 100644 index 00000000..e8be3e6c --- /dev/null +++ b/transact/src/test/java/dev/dbos/transact/queue/DynamicQueuesTest.java @@ -0,0 +1,1003 @@ +package dev.dbos.transact.queue; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import dev.dbos.transact.Constants; +import dev.dbos.transact.DBOS; +import dev.dbos.transact.DBOSTestAccess; +import dev.dbos.transact.StartWorkflowOptions; +import dev.dbos.transact.config.DBOSConfig; +import dev.dbos.transact.json.SerializationUtil; +import dev.dbos.transact.utils.DBUtils; +import dev.dbos.transact.utils.PgContainer; +import dev.dbos.transact.utils.WorkflowStatusInternalBuilder; +import dev.dbos.transact.workflow.ListWorkflowsInput; +import dev.dbos.transact.workflow.Queue; +import dev.dbos.transact.workflow.QueueConflictResolution; +import dev.dbos.transact.workflow.QueueOptions; +import dev.dbos.transact.workflow.WorkflowHandle; +import dev.dbos.transact.workflow.WorkflowState; +import dev.dbos.transact.workflow.WorkflowStatus; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.List; +import java.util.function.BooleanSupplier; + +import com.zaxxer.hikari.HikariDataSource; +import org.junit.jupiter.api.AutoClose; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DynamicQueuesTest { + + private static final Logger logger = LoggerFactory.getLogger(DynamicQueuesTest.class); + + @AutoClose final PgContainer pgContainer = new PgContainer(); + + DBOSConfig dbosConfig; + @AutoClose DBOS dbos; + @AutoClose HikariDataSource dataSource; + + @BeforeEach + void beforeEach() { + dbosConfig = pgContainer.dbosConfig(); + dbos = new DBOS(dbosConfig); + dataSource = pgContainer.dataSource(); + } + + @Test + public void testDynamicQueueWorkflowExecution() throws Exception { + ServiceQ serviceQ = dbos.registerProxy(ServiceQ.class, new ServiceQImpl()); + dbos.launch(); + + var qs = DBOSTestAccess.getQueueService(dbos); + qs.setSpeedupForTest(); + + // Register a dynamic queue after launch — this writes to DB. + dbos.registerQueue("dynQueue", QueueOptions.empty()); + + // The supervisor polls every 1s; wait for it to discover and start a listener. + var handle = + dbos.startWorkflow( + () -> serviceQ.simpleQWorkflow("hello"), + new StartWorkflowOptions().withQueue("dynQueue")); + + assertEquals("hellohello", handle.getResult()); + assertEquals(WorkflowState.SUCCESS, handle.getStatus().status()); + } + + @Test + public void testDynamicQueueConcurrency() throws Exception { + ServiceQ serviceQ = dbos.registerProxy(ServiceQ.class, new ServiceQImpl()); + dbos.launch(); + + var qs = DBOSTestAccess.getQueueService(dbos); + qs.setSpeedupForTest(); + + dbos.registerQueue("concQ", QueueOptions.setConcurrency(1).andWorkerConcurrency(1)); + + for (int i = 0; i < 3; i++) { + String id = "dynwf" + i; + String input = "v" + i; + dbos.startWorkflow( + () -> serviceQ.simpleQWorkflow(input), new StartWorkflowOptions(id).withQueue("concQ")); + } + + for (int i = 0; i < 3; i++) { + var handle = dbos.retrieveWorkflow("dynwf" + i); + assertEquals("v" + i + "v" + i, handle.getResult()); + assertEquals(WorkflowState.SUCCESS, handle.getStatus().status()); + } + } + + @Test + public void testListQueues() throws Exception { + dbos.launch(); + + dbos.registerQueue("q-list-1", QueueOptions.setConcurrency(1)); + dbos.registerQueue("q-list-2", QueueOptions.setConcurrency(2)); + dbos.registerQueue("q-list-3", QueueOptions.empty()); + + var queues = dbos.listQueues(); + var names = queues.stream().map(Queue::name).toList(); + assertTrue(names.contains("q-list-1")); + assertTrue(names.contains("q-list-2")); + assertTrue(names.contains("q-list-3")); + assertEquals(3, names.size()); + } + + @Test + public void testDeleteQueue() throws Exception { + dbos.launch(); + + dbos.registerQueue("q-del", QueueOptions.setConcurrency(1)); + assertTrue(dbos.listQueues().stream().anyMatch(q -> q.name().equals("q-del"))); + + boolean deleted = dbos.deleteQueue("q-del"); + assertTrue(deleted); + assertFalse(dbos.listQueues().stream().anyMatch(q -> q.name().equals("q-del"))); + + // deleting a non-existent queue returns false + assertFalse(dbos.deleteQueue("q-never-existed")); + } + + @Test + public void testUpdateQueue() throws Exception { + dbos.launch(); + + dbos.registerQueue("q-update", QueueOptions.setConcurrency(5)); + + var before = + dbos.listQueues().stream() + .filter(x -> x.name().equals("q-update")) + .findFirst() + .orElseThrow(); + assertEquals(5, before.concurrency()); + + dbos.updateQueue("q-update", QueueOptions.setConcurrency(10)); + + var after = + dbos.listQueues().stream() + .filter(x -> x.name().equals("q-update")) + .findFirst() + .orElseThrow(); + assertEquals(10, after.concurrency()); + } + + @Test + public void testRegisterQueueNeverUpdate() throws Exception { + dbos.launch(); + + dbos.registerQueue("q-conflict", QueueOptions.setConcurrency(5)); + + // NEVER_UPDATE: second call should not overwrite + dbos.registerQueue( + "q-conflict", QueueOptions.setConcurrency(99), QueueConflictResolution.NEVER_UPDATE); + + var q = + dbos.listQueues().stream() + .filter(x -> x.name().equals("q-conflict")) + .findFirst() + .orElseThrow(); + assertEquals(5, q.concurrency()); + } + + @Test + public void testRegisterQueueAlwaysUpdate() throws Exception { + dbos.launch(); + + dbos.registerQueue("q-always", QueueOptions.setConcurrency(5)); + + // ALWAYS_UPDATE: second call should overwrite + dbos.registerQueue( + "q-always", QueueOptions.setConcurrency(99), QueueConflictResolution.ALWAYS_UPDATE); + + var q = + dbos.listQueues().stream() + .filter(x -> x.name().equals("q-always")) + .findFirst() + .orElseThrow(); + assertEquals(99, q.concurrency()); + } + + @Test + public void testDynamicQueuePollingInterval() throws Exception { + dbos.launch(); + + var interval = Duration.ofSeconds(3); + dbos.registerQueue("q-poll", QueueOptions.setPollingInterval(interval)); + + var q = + dbos.listQueues().stream().filter(x -> x.name().equals("q-poll")).findFirst().orElseThrow(); + assertEquals(interval, q.pollingInterval()); + } + + @Test + public void testRegisterInternalQueueThrows() throws Exception { + dbos.launch(); + + assertThrows( + IllegalArgumentException.class, + () -> dbos.registerQueue("_dbos_internal_queue", QueueOptions.empty())); + } + + @Test + public void testDeleteAndRecreateQueue() throws Exception { + ServiceQ serviceQ = dbos.registerProxy(ServiceQ.class, new ServiceQImpl()); + dbos.launch(); + + var qs = DBOSTestAccess.getQueueService(dbos); + qs.setSpeedupForTest(); + + dbos.registerQueue("q-lifecycle", QueueOptions.setConcurrency(5)); + + var h1 = + dbos.startWorkflow( + () -> serviceQ.simpleQWorkflow("first"), + new StartWorkflowOptions("lc-wf1").withQueue("q-lifecycle")); + assertEquals("firstfirst", h1.getResult()); + + dbos.deleteQueue("q-lifecycle"); + assertFalse(dbos.listQueues().stream().anyMatch(x -> x.name().equals("q-lifecycle"))); + + // Wait for the old listener to detect the deletion and remove itself from the + // active-listener set. Without this wait the supervisor may not start a fresh + // listener for the recreated queue (dbListeningQueues still contains the name). + Thread.sleep(500); + + // Recreate with different config. + dbos.registerQueue("q-lifecycle", QueueOptions.setConcurrency(2)); + var recreated = + dbos.listQueues().stream() + .filter(x -> x.name().equals("q-lifecycle")) + .findFirst() + .orElseThrow(); + assertEquals(2, recreated.concurrency()); + + var h2 = + dbos.startWorkflow( + () -> serviceQ.simpleQWorkflow("second"), + new StartWorkflowOptions("lc-wf2").withQueue("q-lifecycle")); + assertEquals("secondsecond", h2.getResult()); + } + + @Test + public void testStaticAndDynamicQueueSameName() throws Exception { + // Static queue registered pre-launch. + var staticQ = new Queue("q-shared").withConcurrency(3); + dbos.registerQueue(staticQ); + ServiceQ serviceQ = dbos.registerProxy(ServiceQ.class, new ServiceQImpl()); + dbos.launch(); + + var qs = DBOSTestAccess.getQueueService(dbos); + qs.setSpeedupForTest(); + + // Register same name as a dynamic queue — supervisor should ignore the DB entry. + dbos.registerQueue("q-shared", QueueOptions.setConcurrency(99)); + + // Workflow still executes — static listener handles it. + var handle = + dbos.startWorkflow( + () -> serviceQ.simpleQWorkflow("shared"), + new StartWorkflowOptions().withQueue("q-shared")); + assertEquals("sharedshared", handle.getResult()); + assertEquals(WorkflowState.SUCCESS, handle.getStatus().status()); + } + + @Test + public void testDynamicConcurrencyTakesEffect() throws Exception { + ConcurrencyTestServiceImpl impl = new ConcurrencyTestServiceImpl(); + ConcurrencyTestService service = dbos.registerProxy(ConcurrencyTestService.class, impl); + dbos.launch(); + + var qs = DBOSTestAccess.getQueueService(dbos); + qs.setSpeedupForTest(); + + // Start with concurrency=1 so only one workflow dequeues at a time. + dbos.registerQueue("dyn-update-q", QueueOptions.setConcurrency(1)); + + var h1 = + dbos.startWorkflow( + () -> service.blockedWorkflow(0), + new StartWorkflowOptions("dyn-wf1").withQueue("dyn-update-q")); + var h2 = + dbos.startWorkflow( + () -> service.blockedWorkflow(1), + new StartWorkflowOptions("dyn-wf2").withQueue("dyn-update-q")); + var h3 = + dbos.startWorkflow( + () -> service.blockedWorkflow(2), + new StartWorkflowOptions("dyn-wf3").withQueue("dyn-update-q")); + + // Wait for exactly one workflow to be dequeued and start running. + impl.wfSemaphore.acquire(1); + + // With concurrency=1 the other two should still be waiting. + Thread.sleep(200); + assertEquals(WorkflowState.ENQUEUED, h2.getStatus().status()); + assertEquals(WorkflowState.ENQUEUED, h3.getStatus().status()); + + // Bump concurrency. The runner reloads queue settings on its next poll and + // should immediately dequeue the remaining two workflows. + dbos.updateQueue("dyn-update-q", QueueOptions.setConcurrency(3)); + impl.wfSemaphore.acquire(2); + + // Release all blocked workflows and verify they complete successfully. + impl.latch.countDown(); + assertEquals(0, h1.getResult()); + assertEquals(1, h2.getResult()); + assertEquals(2, h3.getResult()); + } + + @Test + public void testDynamicQueueMapUpdatedOnRegister() throws Exception { + dbos.launch(); + var qs = DBOSTestAccess.getQueueService(dbos); + + assertFalse(qs.findDynamicQueue("q-map-reg").isPresent()); + + dbos.registerQueue("q-map-reg", QueueOptions.setConcurrency(5)); + awaitCondition(() -> qs.findDynamicQueue("q-map-reg").isPresent()); + + assertEquals(5, qs.findDynamicQueue("q-map-reg").get().concurrency()); + } + + @Test + public void testDynamicQueueMapUpdatedOnUpdate() throws Exception { + dbos.launch(); + var qs = DBOSTestAccess.getQueueService(dbos); + + dbos.registerQueue("q-map-upd", QueueOptions.setConcurrency(5)); + awaitCondition(() -> qs.findDynamicQueue("q-map-upd").isPresent()); + + dbos.updateQueue("q-map-upd", QueueOptions.setConcurrency(10)); + awaitCondition( + () -> + qs.findDynamicQueue("q-map-upd") + .filter(q -> Integer.valueOf(10).equals(q.concurrency())) + .isPresent()); + + assertEquals(10, qs.findDynamicQueue("q-map-upd").get().concurrency()); + } + + @Test + public void testDynamicQueueMapUpdatedOnDelete() throws Exception { + dbos.launch(); + var qs = DBOSTestAccess.getQueueService(dbos); + + dbos.registerQueue("q-map-del", QueueOptions.empty()); + awaitCondition(() -> qs.findDynamicQueue("q-map-del").isPresent()); + + dbos.deleteQueue("q-map-del"); + awaitCondition(() -> qs.findDynamicQueue("q-map-del").isEmpty()); + } + + @Test + public void testRegisterQueueValidation() throws Exception { + dbos.launch(); + + // Zero or negative polling interval should fail. + assertThrows( + IllegalArgumentException.class, + () -> + dbos.registerQueue( + "q-bad-poll", QueueOptions.setPollingInterval(Duration.ofSeconds(-1)))); + assertThrows( + IllegalArgumentException.class, + () -> dbos.registerQueue("q-bad-poll", QueueOptions.setPollingInterval(Duration.ZERO))); + + // Zero or negative concurrency should fail. + assertThrows( + IllegalArgumentException.class, + () -> dbos.registerQueue("q-bad-conc", QueueOptions.setConcurrency(0))); + } + + @Test + public void testDedupeId() throws Exception { + + ServiceQ serviceQ = dbos.registerProxy(ServiceQ.class, new ServiceQImpl()); + dbos.launch(); + + var qs = DBOSTestAccess.getQueueService(dbos); + qs.setSpeedupForTest(); + dbos.registerQueue("firstQueue", QueueOptions.empty()); + + // pause queue service for test validation + qs.pause(); + + var options = new StartWorkflowOptions().withQueue("firstQueue"); + var dedupeId = "dedupeId"; + var h1 = + dbos.startWorkflow( + () -> serviceQ.simpleQWorkflow("abc"), options.withDeduplicationId(dedupeId)); + var s1 = h1.getStatus(); + assertEquals(s1.queueName(), "firstQueue"); + assertEquals(s1.deduplicationId(), dedupeId); + + // enqueue with different dedupe ID should be fine + var dedupeId2 = "different-dedupeId"; + var h2 = + dbos.startWorkflow( + () -> serviceQ.simpleQWorkflow("def"), options.withDeduplicationId(dedupeId2)); + var s2 = h2.getStatus(); + assertEquals(s2.queueName(), "firstQueue"); + assertEquals(s2.deduplicationId(), dedupeId2); + + // enqueue with no dedupe ID should be fine + var h3 = dbos.startWorkflow(() -> serviceQ.simpleQWorkflow("ghi"), options); + var s3 = h3.getStatus(); + assertEquals(s3.queueName(), "firstQueue"); + assertNull(s3.deduplicationId()); + + assertThrows( + RuntimeException.class, + () -> + dbos.startWorkflow( + () -> serviceQ.simpleQWorkflow("jkl"), options.withDeduplicationId(dedupeId))); + + // enable queue service to run + qs.unpause(); + + // wait for initial workflow with initial dedupe ID to finish + h1.getResult(); + h2.getResult(); + h3.getResult(); + + var h4 = + dbos.startWorkflow( + () -> serviceQ.simpleQWorkflow("jkl"), options.withDeduplicationId(dedupeId)); + h4.getResult(); + + var rows = DBUtils.getWorkflowRows(dataSource); + assertEquals(4, rows.size()); + + for (var row : rows) { + assertEquals(WorkflowState.SUCCESS.name(), row.status()); + assertEquals("firstQueue", row.queueName()); + assertNull(row.deduplicationId()); + } + } + + @Test + public void testDedupeIdWithDelay() throws Exception { + + ServiceQ serviceQ = dbos.registerProxy(ServiceQ.class, new ServiceQImpl()); + dbos.launch(); + + var qs = DBOSTestAccess.getQueueService(dbos); + qs.setSpeedupForTest(); + dbos.registerQueue("firstQueue", QueueOptions.empty()); + + qs.pause(); + + var dedupeId = "dedupeId"; + var options = new StartWorkflowOptions().withQueue("firstQueue").withDeduplicationId(dedupeId); + var h1 = + dbos.startWorkflow( + () -> serviceQ.simpleQWorkflow("abc"), options.withDelay(Duration.ofHours(1))); + var s1 = h1.getStatus(); + assertEquals(WorkflowState.DELAYED, s1.status()); + assertEquals(dedupeId, s1.deduplicationId()); + + // Same dedupe ID should conflict even while DELAYED + assertThrows( + RuntimeException.class, + () -> dbos.startWorkflow(() -> serviceQ.simpleQWorkflow("def"), options)); + + // Clear the delay and run + dbos.setWorkflowDelay(h1.workflowId(), Instant.now().minusSeconds(1)); + qs.unpause(); + h1.getResult(); + + // After completion the dedupe ID is released — re-enqueue should succeed + var h2 = dbos.startWorkflow(() -> serviceQ.simpleQWorkflow("ghi"), options); + h2.getResult(); + } + + @Test + public void testPriority() throws Exception { + + ServiceQImpl impl = new ServiceQImpl(); + ServiceQ serviceQ = dbos.registerProxy(ServiceQ.class, impl); + dbos.launch(); + + var qs = DBOSTestAccess.getQueueService(dbos); + qs.setSpeedupForTest(); + dbos.registerQueue( + "firstQueue", + QueueOptions.setPriorityEnabled(true).andConcurrency(1).andWorkerConcurrency(1)); + + qs.pause(); + + var o1 = new StartWorkflowOptions().withQueue("firstQueue").withPriority(100); + var h1 = dbos.startWorkflow(() -> serviceQ.priorityWorkflow(100), o1); + + var o2 = new StartWorkflowOptions().withQueue("firstQueue").withPriority(50); + var h2 = dbos.startWorkflow(() -> serviceQ.priorityWorkflow(50), o2); + + var o3 = new StartWorkflowOptions().withQueue("firstQueue").withPriority(10); + var h3 = dbos.startWorkflow(() -> serviceQ.priorityWorkflow(10), o3); + + qs.unpause(); + + h1.getResult(); + h2.getResult(); + h3.getResult(); + + assertEquals(3, impl.queue.size()); + assertEquals(10, impl.queue.remove()); + assertEquals(50, impl.queue.remove()); + assertEquals(100, impl.queue.remove()); + } + + @Test + public void testQueuedMultipleWorkflows() throws Exception { + + ServiceQ serviceQ = dbos.registerProxy(ServiceQ.class, new ServiceQImpl()); + dbos.launch(); + + var qs = DBOSTestAccess.getQueueService(dbos); + qs.setSpeedupForTest(); + dbos.registerQueue("firstQueue", QueueOptions.setConcurrency(1).andWorkerConcurrency(1)); + + qs.pause(); + Thread.sleep(2000); + + for (int i = 0; i < 5; i++) { + String id = "wfid" + i; + var input = "inputq" + i; + dbos.startWorkflow( + () -> serviceQ.simpleQWorkflow(input), + new StartWorkflowOptions(id).withQueue("firstQueue")); + } + + var input = new ListWorkflowsInput().withQueuesOnly(true).withLoadInput(true); + List wfs = dbos.listWorkflows(input); + + for (int i = 0; i < 5; i++) { + String id = "wfid" + i; + + assertEquals(id, wfs.get(i).workflowId()); + assertEquals(WorkflowState.ENQUEUED, wfs.get(i).status()); + } + + qs.unpause(); + + for (int i = 0; i < 5; i++) { + String id = "wfid" + i; + + var handle = dbos.retrieveWorkflow(id); + assertEquals(id, handle.workflowId()); + String result = (String) handle.getResult(); + assertEquals("inputq" + i + "inputq" + i, result); + assertEquals(WorkflowState.SUCCESS, handle.getStatus().status()); + } + } + + @Test + void testListQueuedWorkflow() throws Exception { + + ServiceQ serviceQ = dbos.registerProxy(ServiceQ.class, new ServiceQImpl()); + dbos.launch(); + + var qs = DBOSTestAccess.getQueueService(dbos); + qs.setSpeedupForTest(); + dbos.registerQueue("firstQueue", QueueOptions.setConcurrency(1).andWorkerConcurrency(1)); + + qs.pause(); + + for (int i = 0; i < 5; i++) { + String id = "wfid" + i; + var input = "inputq" + i; + dbos.startWorkflow( + () -> serviceQ.simpleQWorkflow(input), + new StartWorkflowOptions(id).withQueue("firstQueue")); + Thread.sleep(100); + } + + var input = new ListWorkflowsInput().withQueuesOnly(true).withLoadInput(true); + List wfs = dbos.listWorkflows(input); + wfs.sort( + (a, b) -> { + return a.workflowId().compareTo(b.workflowId()); + }); + + for (int i = 0; i < 5; i++) { + String id = "wfid" + i; + + assertEquals(id, wfs.get(i).workflowId()); + assertEquals(WorkflowState.ENQUEUED, wfs.get(i).status()); + } + + wfs = dbos.listWorkflows(input.withQueueName("abc")); + assertEquals(0, wfs.size()); + + wfs = dbos.listWorkflows(input.withQueueName("firstQueue")); + assertEquals(5, wfs.size()); + + wfs = dbos.listWorkflows(input.withEndTime(Instant.now().minus(10, ChronoUnit.SECONDS))); + assertEquals(0, wfs.size()); + } + + @Test + public void multipleQueues() throws Exception { + + ServiceQ serviceQ1 = dbos.registerProxy(ServiceQ.class, new ServiceQImpl()); + ServiceI serviceI = dbos.registerProxy(ServiceI.class, new ServiceIImpl()); + dbos.launch(); + + var qs = DBOSTestAccess.getQueueService(dbos); + qs.setSpeedupForTest(); + dbos.registerQueue("firstQueue", QueueOptions.setConcurrency(1).andWorkerConcurrency(1)); + dbos.registerQueue("secondQueue", QueueOptions.setConcurrency(1).andWorkerConcurrency(1)); + + String id1 = "firstQ1234"; + String id2 = "second1234"; + + var options1 = new StartWorkflowOptions(id1).withQueue("firstQueue"); + WorkflowHandle handle1 = + dbos.startWorkflow(() -> serviceQ1.simpleQWorkflow("firstinput"), options1); + + var options2 = new StartWorkflowOptions(id2).withQueue("secondQueue"); + WorkflowHandle handle2 = dbos.startWorkflow(() -> serviceI.workflowI(25), options2); + + assertEquals(id1, handle1.workflowId()); + String result = handle1.getResult(); + assertEquals("firstQueue", handle1.getStatus().queueName()); + assertEquals("firstinputfirstinput", result); + assertEquals(WorkflowState.SUCCESS, handle1.getStatus().status()); + + assertEquals(id2, handle2.workflowId()); + Integer result2 = (Integer) handle2.getResult(); + assertEquals("secondQueue", handle2.getStatus().queueName()); + assertEquals(50, result2); + assertEquals(WorkflowState.SUCCESS, handle2.getStatus().status()); + } + + @Test + public void testLimiter() throws Exception { + + int limit = 5; + double periodSec = 1.8; + Duration period = Duration.ofMillis((long) (periodSec * 1000)); + + ServiceQ serviceQ = dbos.registerProxy(ServiceQ.class, new ServiceQImpl()); + dbos.launch(); + + var qs = DBOSTestAccess.getQueueService(dbos); + qs.setSpeedupForTest(); + dbos.registerQueue( + "limitQueue", + QueueOptions.setRateLimit(limit, period).andConcurrency(1).andWorkerConcurrency(1)); + Thread.sleep(1000); + + int numWaves = 3; + int numTasks = numWaves * limit; + List> handles = new ArrayList<>(); + List times = new ArrayList<>(); + + for (int i = 0; i < numTasks; i++) { + String id = "id" + i; + var options = new StartWorkflowOptions(id).withQueue("limitQueue"); + WorkflowHandle handle = + dbos.startWorkflow(() -> serviceQ.limitWorkflow("abc", "123"), options); + handles.add(handle); + } + + for (WorkflowHandle h : handles) { + double result = h.getResult(); + logger.info(String.valueOf(result)); + times.add(result); + } + + double waveTolerance = 1.0; + for (int wave = 0; wave < numWaves; wave++) { + for (int i = wave * limit; i < (wave + 1) * limit - 1; i++) { + double diff = times.get(i + 1) - times.get(i); + logger.info(String.format("Wave %d, Task %d-%d: Time diff %.3f", wave, i, i + 1, diff)); + assertTrue( + diff < waveTolerance, + String.format( + "Wave %d: Tasks %d and %d should start close together. Diff: %.3f", + wave, i, i + 1, diff)); + } + } + logger.info("Verified intra-wave timing."); + + double periodTolerance = 0.5; + for (int wave = 0; wave < numWaves - 1; wave++) { + double startOfNextWave = times.get(limit * (wave + 1)); + double startOfCurrentWave = times.get(limit * wave); + double gap = startOfNextWave - startOfCurrentWave; + logger.info(String.format("Gap between Wave %d and %d: %.3f", wave, wave + 1, gap)); + assertTrue( + gap > periodSec - periodTolerance, + String.format( + "Gap between wave %d and %d should be at least %.3f. Actual: %.3f", + wave, wave + 1, periodSec - periodTolerance, gap)); + assertTrue( + gap < periodSec + periodTolerance, + String.format( + "Gap between wave %d and %d should be at most %.3f. Actual: %.3f", + wave, wave + 1, periodSec + periodTolerance, gap)); + } + + for (WorkflowHandle h : handles) { + assertEquals(WorkflowState.SUCCESS, h.getStatus().status()); + } + } + + @Test + public void testWorkerConcurrency() throws Exception { + + dbos.launch(); + var systemDatabase = DBOSTestAccess.getSystemDatabase(dbos); + var dbosExecutor = DBOSTestAccess.getDbosExecutor(dbos); + var queueService = DBOSTestAccess.getQueueService(dbos); + + dbos.registerQueue("QwithWCLimit", QueueOptions.setConcurrency(3).andWorkerConcurrency(2)); + Queue qwithWCLimit = dbos.findQueue("QwithWCLimit").get(); + + String executorId = dbosExecutor.executorId(); + String appVersion = dbosExecutor.appVersion(); + + queueService.close(); + while (!queueService.isStopped()) { + Thread.sleep(2000); + logger.info("Waiting for queueService to stop"); + } + + var serArgs = SerializationUtil.serializeValue(new Object[] {"ORD-12345"}, null, null); + var builder = + new WorkflowStatusInternalBuilder() + .workflowName("OrderProcessingWorkflow") + .className("com.example.workflows.OrderWorkflow") + .instanceName("prod-config") + .authenticatedUser("user123@example.com") + .assumedRole("admin") + .authenticatedRoles(new String[] {"admin", "operator"}) + .queueName("QwithWCLimit") + .executorId(executorId) + .appVersion(appVersion) + .appId("order-app-123") + .timeout(Duration.ofMillis(300000)) + .deadline(Instant.ofEpochMilli(System.currentTimeMillis() + 2400000)) + .priority(1) + .inputs(serArgs.serializedValue()); + + for (int i = 0; i < 4; i++) { + String wfid = "id" + i; + var status = builder.workflowId(wfid).deduplicationId("dedup" + i).build(); + systemDatabase.initWorkflowStatus(status, null, false, false); + } + + var readBack = systemDatabase.listWorkflows(new ListWorkflowsInput("id0")).get(0); + assertArrayEquals(new String[] {"admin", "operator"}, readBack.authenticatedRoles()); + + List idsToRun = + systemDatabase.getAndStartQueuedWorkflows(qwithWCLimit, executorId, appVersion, null); + + assertEquals(2, idsToRun.size()); + + // run the same above 2 are in Pending. + // So no de queueing + idsToRun = + systemDatabase.getAndStartQueuedWorkflows(qwithWCLimit, executorId, appVersion, null); + assertEquals(0, idsToRun.size()); + + // mark the first 2 as success + DBUtils.updateAllWorkflowStates( + dataSource, WorkflowState.PENDING.name(), WorkflowState.SUCCESS.name()); + + // next 2 get dequeued + idsToRun = + systemDatabase.getAndStartQueuedWorkflows(qwithWCLimit, executorId, appVersion, null); + assertEquals(2, idsToRun.size()); + + DBUtils.updateAllWorkflowStates( + dataSource, WorkflowState.PENDING.name(), WorkflowState.SUCCESS.name()); + idsToRun = + systemDatabase.getAndStartQueuedWorkflows( + qwithWCLimit, Constants.DEFAULT_EXECUTORID, Constants.DEFAULT_APP_VERSION, null); + assertEquals(0, idsToRun.size()); + } + + @Test + public void testGlobalConcurrency() throws Exception { + + dbos.launch(); + var systemDatabase = DBOSTestAccess.getSystemDatabase(dbos); + var dbosExecutor = DBOSTestAccess.getDbosExecutor(dbos); + var queueService = DBOSTestAccess.getQueueService(dbos); + + dbos.registerQueue("QwithWCLimit", QueueOptions.setConcurrency(3).andWorkerConcurrency(2)); + Queue qwithWCLimit = dbos.findQueue("QwithWCLimit").get(); + + String executorId = dbosExecutor.executorId(); + String appVersion = dbosExecutor.appVersion(); + + queueService.close(); + while (!queueService.isStopped()) { + Thread.sleep(2000); + logger.info("Waiting for queueService to stop"); + } + + var builder = + new WorkflowStatusInternalBuilder() + .workflowName("OrderProcessingWorkflow") + .className("com.example.workflows.OrderWorkflow") + .instanceName("prod-config") + .authenticatedUser("user123@example.com") + .assumedRole("admin") + .authenticatedRoles(new String[] {"admin", "operator"}) + .queueName("QwithWCLimit") + .executorId(executorId) + .appVersion(appVersion) + .appId("order-app-123") + .timeout(Duration.ofMillis(300000)) + .deadline(Instant.ofEpochMilli(System.currentTimeMillis() + 2400000)) + .priority(1) + .inputs("{\"orderId\":\"ORD-12345\"}"); + + // executor1 + for (int i = 0; i < 2; i++) { + String wfid = "id" + i; + var status = builder.workflowId(wfid).deduplicationId("dedup" + i).build(); + systemDatabase.initWorkflowStatus(status, null, false, false); + } + + // executor2 + String executor2 = "remote"; + for (int i = 2; i < 5; i++) { + String wfid = "id" + i; + var status = + builder.workflowId(wfid).deduplicationId("dedup" + i).executorId(executor2).build(); + systemDatabase.initWorkflowStatus(status, null, false, false); + + DBUtils.setWorkflowState(dataSource, wfid, WorkflowState.PENDING.name()); + } + + List idsToRun = + systemDatabase.getAndStartQueuedWorkflows(qwithWCLimit, executorId, appVersion, null); + // 0 because global concurrency limit is reached + assertEquals(0, idsToRun.size()); + + DBUtils.updateAllWorkflowStates( + dataSource, WorkflowState.PENDING.name(), WorkflowState.SUCCESS.name()); + idsToRun = + systemDatabase.getAndStartQueuedWorkflows( + qwithWCLimit, + // executorId, + executor2, + appVersion, + null); + assertEquals(2, idsToRun.size()); + } + + @Test + public void testenQueueWF() throws Exception { + + ServiceQ serviceQ = dbos.registerProxy(ServiceQ.class, new ServiceQImpl()); + dbos.launch(); + + var qs = DBOSTestAccess.getQueueService(dbos); + qs.setSpeedupForTest(); + dbos.registerQueue("firstQueue", QueueOptions.empty()); + + String id = "q1234"; + + var option = new StartWorkflowOptions(id).withQueue("firstQueue"); + WorkflowHandle handle = + dbos.startWorkflow(() -> serviceQ.simpleQWorkflow("inputq"), option); + + assertEquals(id, handle.workflowId()); + String result = handle.getResult(); + assertEquals("inputqinputq", result); + } + + @Test + public void testQueueConcurrencyUnderRecovery() throws Exception { + ConcurrencyTestServiceImpl impl = new ConcurrencyTestServiceImpl(); + ConcurrencyTestService service = dbos.registerProxy(ConcurrencyTestService.class, impl); + dbos.launch(); + + var qs = DBOSTestAccess.getQueueService(dbos); + qs.setSpeedupForTest(); + dbos.registerQueue("test_queue", QueueOptions.setConcurrency(2)); + + var opt1 = new StartWorkflowOptions("wf1").withQueue("test_queue"); + var handle1 = dbos.startWorkflow(() -> service.blockedWorkflow(0), opt1); + + var opt2 = new StartWorkflowOptions("wf2").withQueue("test_queue"); + var handle2 = dbos.startWorkflow(() -> service.blockedWorkflow(1), opt2); + + var opt3 = new StartWorkflowOptions("wf3").withQueue("test_queue"); + var handle3 = dbos.startWorkflow(() -> service.noopWorkflow(2), opt3); + + // each call to blockedWorkflow releases the semaphore once, + // so block waiting on both calls to release + impl.wfSemaphore.acquire(2); + + assertEquals(2, impl.counter.get()); + assertEquals(WorkflowState.PENDING, handle1.getStatus().status()); + assertEquals(WorkflowState.PENDING, handle2.getStatus().status()); + assertEquals(WorkflowState.ENQUEUED, handle3.getStatus().status()); + + // update WF3 to appear as if it's from a different executor + String sql = + "UPDATE dbos.workflow_status SET status = ?, executor_id = ? where workflow_uuid = ?;"; + + try (Connection connection = DBUtils.getConnection(dbosConfig); + PreparedStatement pstmt = connection.prepareStatement(sql)) { + + pstmt.setString(1, WorkflowState.PENDING.name()); + pstmt.setString(2, "other"); + pstmt.setString(3, opt3.workflowId()); + + // Execute the update and get the number of rows affected + int rowsAffected = pstmt.executeUpdate(); + assertEquals(1, rowsAffected); + } + + var executor = DBOSTestAccess.getDbosExecutor(dbos); + List> otherHandles = executor.recoverPendingWorkflows(List.of("other")); + assertEquals(WorkflowState.PENDING, handle1.getStatus().status()); + assertEquals(WorkflowState.PENDING, handle2.getStatus().status()); + assertEquals(1, otherHandles.size()); + assertEquals(otherHandles.get(0).workflowId(), handle3.workflowId()); + assertEquals(WorkflowState.ENQUEUED, handle3.getStatus().status()); + + // Pause the listener before recovery so it can't race the ENQUEUED status checks below. + qs.pause(); + List> localHandles = executor.recoverPendingWorkflows(List.of("local")); + assertEquals(2, localHandles.size()); + List expectedWorkflowIds = List.of(handle1.workflowId(), handle2.workflowId()); + assertTrue(expectedWorkflowIds.contains(localHandles.get(0).workflowId())); + assertTrue(expectedWorkflowIds.contains(localHandles.get(1).workflowId())); + + assertEquals(2, impl.counter.get()); + // Recovery sets back to enqueued. + // The enqueued run will get skipped (first run is still blocked) + assertEquals(WorkflowState.ENQUEUED, handle1.getStatus().status()); + assertEquals(WorkflowState.ENQUEUED, handle2.getStatus().status()); + assertEquals(WorkflowState.ENQUEUED, handle3.getStatus().status()); + + qs.unpause(); + impl.latch.countDown(); + assertEquals(0, handle1.getResult()); + assertEquals(1, handle2.getResult()); + assertEquals(2, handle3.getResult()); + assertEquals("local", handle3.getStatus().executorId()); + + assertTrue(DBUtils.queueEntriesAreCleanedUp(dataSource)); + } + + @Test + public void testListenQueue() throws Exception { + var config = dbosConfig.withListenQueue("queueOne"); + try (var dbos = new DBOS(config)) { + + ServiceQ serviceQ = dbos.registerProxy(ServiceQ.class, new ServiceQImpl()); + dbos.launch(); + + var qs = DBOSTestAccess.getQueueService(dbos); + qs.setSpeedupForTest(); + dbos.registerQueue("queueOne", QueueOptions.empty()); + dbos.registerQueue("queueTwo", QueueOptions.empty()); + + var h2 = + dbos.startWorkflow( + () -> serviceQ.simpleQWorkflow("two"), + new StartWorkflowOptions().withQueue("queueTwo")); + var h1 = + dbos.startWorkflow( + () -> serviceQ.simpleQWorkflow("one"), + new StartWorkflowOptions().withQueue("queueOne")); + + Thread.sleep(3000); + assertEquals("oneone", h1.getResult()); + assertEquals(WorkflowState.ENQUEUED, h2.getStatus().status()); + } + } + + private static void awaitCondition(BooleanSupplier condition) throws InterruptedException { + long deadline = System.currentTimeMillis() + 2000; + while (!condition.getAsBoolean()) { + if (System.currentTimeMillis() > deadline) + throw new AssertionError("Condition not met within 2s"); + Thread.sleep(50); + } + } +} diff --git a/transact/src/test/java/dev/dbos/transact/queue/QueuesTest.java b/transact/src/test/java/dev/dbos/transact/queue/StaticQueuesTest.java similarity index 96% rename from transact/src/test/java/dev/dbos/transact/queue/QueuesTest.java rename to transact/src/test/java/dev/dbos/transact/queue/StaticQueuesTest.java index 9fdb3590..b4b40468 100644 --- a/transact/src/test/java/dev/dbos/transact/queue/QueuesTest.java +++ b/transact/src/test/java/dev/dbos/transact/queue/StaticQueuesTest.java @@ -34,9 +34,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class QueuesTest { +/** + * Tests for the static (in-memory) queue registration path. All queues here are registered via + * {@code dbos.registerQueue(Queue)} before launch, which exercises the pre-launch static listener + * code path. See {@link QueuesTest} for the equivalent tests using database-backed dynamic queues. + */ +public class StaticQueuesTest { - private static final Logger logger = LoggerFactory.getLogger(QueuesTest.class); + private static final Logger logger = LoggerFactory.getLogger(StaticQueuesTest.class); @AutoClose final PgContainer pgContainer = new PgContainer(); @@ -336,7 +341,8 @@ public void multipleQueues() throws Exception { public void testLimiter() throws Exception { int limit = 5; - double period = 1.8; // + double periodSec = 1.8; + Duration period = Duration.ofMillis((long) (periodSec * 1000)); Queue limitQ = new Queue("limitQueue") @@ -392,15 +398,15 @@ public void testLimiter() throws Exception { double gap = startOfNextWave - startOfCurrentWave; logger.info(String.format("Gap between Wave %d and %d: %.3f", wave, wave + 1, gap)); assertTrue( - gap > period - periodTolerance, + gap > periodSec - periodTolerance, String.format( "Gap between wave %d and %d should be at least %.3f. Actual: %.3f", - wave, wave + 1, period - periodTolerance, gap)); + wave, wave + 1, periodSec - periodTolerance, gap)); assertTrue( - gap < period + periodTolerance, + gap < periodSec + periodTolerance, String.format( "Gap between wave %d and %d should be at most %.3f. Actual: %.3f", - wave, wave + 1, period + periodTolerance, gap)); + wave, wave + 1, periodSec + periodTolerance, gap)); } for (WorkflowHandle h : handles) { diff --git a/transact/src/test/java/dev/dbos/transact/utils/PgContainer.java b/transact/src/test/java/dev/dbos/transact/utils/PgContainer.java index 6c029791..b623f87d 100644 --- a/transact/src/test/java/dev/dbos/transact/utils/PgContainer.java +++ b/transact/src/test/java/dev/dbos/transact/utils/PgContainer.java @@ -76,7 +76,8 @@ public static void truncateDbosTables(Connection conn) throws SQLException { "dbos".event_dispatch_kv, "dbos".streams, "dbos".application_versions, - "dbos".workflow_schedules + "dbos".workflow_schedules, + "dbos".queues CASCADE """; try (var stmt = conn.createStatement()) {