Skip to content

Commit 9ddfe70

Browse files
authored
Dyanmic Queue Support (#391)
# PR Summary: Dynamic Queues (`devhawk/dynamic-queues`) ## New public API (`DBOS` and `DBOSClient`) - `registerQueue(name, QueueOptions)` — creates or updates a queue persisted in the database - `registerQueue(name, QueueOptions, QueueConflictResolution)` — same with explicit conflict handling - `updateQueue(name, QueueOptions)` — partial update; only present fields are written, absent fields left unchanged - `findQueue(name)` — look up a queue by name - `listQueues()` — list all database-backed queues - `deleteQueue(name)` — remove a queue from the database ## New types - **`QueueOptions`** — immutable record with a `Field<T>` for each queue property, supporting three states: absent (don't touch), present-with-value, or present-with-null (clear). Fluent builder API via `set*()` static factories and `and*()` chainers. - **`QueueConflictResolution`** — enum controlling `registerQueue` behavior: `ALWAYS_UPDATE`, `NEVER_UPDATE`, or `UPDATE_IF_LATEST_VERSION` (the last is only available on `DBOS`, not `DBOSClient`, since clients have no concept of app version). - **`Field<T>`** — sealed interface (`Absent` / `Present`) for three-state optional values used by `QueueOptions`. ## `Queue` record changes - Added `pollingInterval` field (defaults to 1 second) with validation - `withRateLimit(int, double)` replaced by `withRateLimit(int, long, TimeUnit)` for type safety ## Runtime: dynamic queue polling (`QueueService`) - New `pollDynamicQueues()` supervisor runs every second, reads all queues from the database, and starts/stops `QueueListenerTask` instances accordingly - Static queues take precedence: if a DB queue shares a name with a static queue, the static config wins and a warning is logged - `listenQueues` filter respected for dynamic queues too ## `SchedulerService` refactor - `pollWorkflowSchedulesImpl` inlined; scheduler loop simplified ## Conductor protocol - New `get_queue` / `list_queues` request+response message types and `QueueOutput` DTO, so the conductor can serve queue info to clients ## Schema / DAO - `QueuesDAO` — full implementation: `upsertQueue`, `updateQueue`, `findQueue`, `listQueues`, `deleteQueue`, `getQueuePartitions` - Migration updated to use `INT4` (explicitly 32-bit) for `concurrency`, `worker_concurrency`, and `rate_limit_max` columns — fixes a CRDB incompatibility where `INTEGER` defaults to 64-bit `int8` fixes #372
1 parent 9e90b16 commit 9ddfe70

27 files changed

Lines changed: 2640 additions & 273 deletions

transact/src/main/java/dev/dbos/transact/DBOS.java

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
import dev.dbos.transact.workflow.ForkOptions;
1515
import dev.dbos.transact.workflow.ListWorkflowsInput;
1616
import dev.dbos.transact.workflow.Queue;
17+
import dev.dbos.transact.workflow.QueueConflictResolution;
18+
import dev.dbos.transact.workflow.QueueOptions;
1719
import dev.dbos.transact.workflow.ScheduleStatus;
1820
import dev.dbos.transact.workflow.SerializationStrategy;
1921
import dev.dbos.transact.workflow.Step;
@@ -164,6 +166,76 @@ public void registerQueues(@NonNull Queue... queues) {
164166
}
165167
}
166168

169+
/**
170+
* Register a database-backed dynamic queue. Must be called after launch. Queue configuration can
171+
* be updated at runtime via {@link #updateQueue(String, QueueOptions)}.
172+
*
173+
* <p>Uses {@link QueueConflictResolution#UPDATE_IF_LATEST_VERSION} by default: the existing
174+
* configuration is overwritten only if this executor is running the latest application version.
175+
*
176+
* @param name Queue name
177+
* @param options Initial configuration options
178+
*/
179+
public void registerQueue(@NonNull String name, @NonNull QueueOptions options) {
180+
ensureLaunched("registerQueue")
181+
.registerDynamicQueue(name, options, QueueConflictResolution.UPDATE_IF_LATEST_VERSION);
182+
}
183+
184+
/**
185+
* Register a database-backed dynamic queue. Must be called after launch. Queue configuration can
186+
* be updated at runtime via {@link #updateQueue(String, QueueOptions)}.
187+
*
188+
* @param name Queue name
189+
* @param options Initial configuration options
190+
* @param onConflict How to handle an existing queue with the same name
191+
*/
192+
public void registerQueue(
193+
@NonNull String name,
194+
@NonNull QueueOptions options,
195+
@NonNull QueueConflictResolution onConflict) {
196+
ensureLaunched("registerQueue").registerDynamicQueue(name, options, onConflict);
197+
}
198+
199+
/**
200+
* Update the configuration of a database-backed dynamic queue. Must be called after launch. Only
201+
* fields that are present in {@code options} are written; absent fields are left unchanged.
202+
*
203+
* @param name Queue name
204+
* @param options Fields to update
205+
*/
206+
public void updateQueue(@NonNull String name, @NonNull QueueOptions options) {
207+
ensureLaunched("updateQueue").updateDynamicQueue(name, options);
208+
}
209+
210+
/**
211+
* Retrieve a database-backed dynamic queue by name. Must be called after launch.
212+
*
213+
* @param name Queue name
214+
* @return the queue if it exists in the database, or empty
215+
*/
216+
public @NonNull Optional<Queue> findQueue(@NonNull String name) {
217+
return ensureLaunched("findQueue").findDynamicQueue(name);
218+
}
219+
220+
/**
221+
* Delete a database-backed dynamic queue. Must be called after launch.
222+
*
223+
* @param name Queue name
224+
* @return true if the queue was deleted, false if it did not exist
225+
*/
226+
public boolean deleteQueue(@NonNull String name) {
227+
return ensureLaunched("deleteQueue").deleteDynamicQueue(name);
228+
}
229+
230+
/**
231+
* List all database-backed dynamic queues. Must be called after launch.
232+
*
233+
* @return list of all queues currently registered in the database
234+
*/
235+
public @NonNull List<Queue> listQueues() {
236+
return ensureLaunched("listQueues").listDynamicQueues();
237+
}
238+
167239
/**
168240
* Register all workflows and steps in the provided class instance
169241
*
@@ -311,7 +383,7 @@ private DBOSExecutor ensureLaunched(String caller) {
311383
* @return Optional containing the queue definition for given `queueName`, or empty if not found
312384
*/
313385
public @NonNull Optional<Queue> getQueue(@NonNull String queueName) {
314-
return ensureLaunched("getQueue").getQueue(queueName);
386+
return ensureLaunched("getQueue").findStaticQueue(queueName);
315387
}
316388

317389
/**

transact/src/main/java/dev/dbos/transact/DBOSClient.java

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
import dev.dbos.transact.json.SerializationUtil;
1414
import dev.dbos.transact.workflow.ForkOptions;
1515
import dev.dbos.transact.workflow.ListWorkflowsInput;
16+
import dev.dbos.transact.workflow.Queue;
17+
import dev.dbos.transact.workflow.QueueConflictResolution;
18+
import dev.dbos.transact.workflow.QueueOptions;
1619
import dev.dbos.transact.workflow.ScheduleStatus;
1720
import dev.dbos.transact.workflow.SerializationStrategy;
1821
import dev.dbos.transact.workflow.StepInfo;
@@ -1097,4 +1100,78 @@ public void setWorkflowDelay(@NonNull String workflowId, @NonNull Instant delayU
10971100
Objects.requireNonNull(delayUntil, "delayUntil must not be null"));
10981101
systemDatabase.setWorkflowDelay(workflowId, wfDelay);
10991102
}
1103+
1104+
/**
1105+
* Register a database-backed dynamic queue. Uses {@link QueueConflictResolution#ALWAYS_UPDATE} by
1106+
* default.
1107+
*
1108+
* @param name Queue name
1109+
* @param options Configuration options
1110+
*/
1111+
public void registerQueue(@NonNull String name, @NonNull QueueOptions options) {
1112+
registerQueue(name, options, QueueConflictResolution.ALWAYS_UPDATE);
1113+
}
1114+
1115+
/**
1116+
* Register a database-backed dynamic queue.
1117+
*
1118+
* <p>{@link QueueConflictResolution#UPDATE_IF_LATEST_VERSION} is not supported by {@code
1119+
* DBOSClient} because clients are not associated with an application version. Use {@link
1120+
* QueueConflictResolution#ALWAYS_UPDATE} or {@link QueueConflictResolution#NEVER_UPDATE}.
1121+
*
1122+
* @param name Queue name
1123+
* @param options Configuration options
1124+
* @param onConflict How to handle an existing queue with the same name
1125+
*/
1126+
public void registerQueue(
1127+
@NonNull String name,
1128+
@NonNull QueueOptions options,
1129+
@NonNull QueueConflictResolution onConflict) {
1130+
if (onConflict == QueueConflictResolution.UPDATE_IF_LATEST_VERSION) {
1131+
throw new IllegalArgumentException(
1132+
"DBOSClient.registerQueue does not support UPDATE_IF_LATEST_VERSION because clients are"
1133+
+ " not associated with an application version. Use ALWAYS_UPDATE or NEVER_UPDATE.");
1134+
}
1135+
systemDatabase.upsertQueue(name, options, onConflict == QueueConflictResolution.ALWAYS_UPDATE);
1136+
}
1137+
1138+
/**
1139+
* Update the configuration of a database-backed dynamic queue. Only fields that are present in
1140+
* {@code options} are written; absent fields are left unchanged.
1141+
*
1142+
* @param name Queue name
1143+
* @param options Fields to update
1144+
*/
1145+
public void updateQueue(@NonNull String name, @NonNull QueueOptions options) {
1146+
systemDatabase.updateQueue(name, options);
1147+
}
1148+
1149+
/**
1150+
* Retrieve a database-backed dynamic queue by name.
1151+
*
1152+
* @param name Queue name
1153+
* @return the queue if it exists in the database, or empty
1154+
*/
1155+
public @NonNull Optional<Queue> findQueue(@NonNull String name) {
1156+
return systemDatabase.findQueue(name);
1157+
}
1158+
1159+
/**
1160+
* List all database-backed dynamic queues.
1161+
*
1162+
* @return list of all queues currently registered in the database
1163+
*/
1164+
public @NonNull List<Queue> listQueues() {
1165+
return systemDatabase.listQueues();
1166+
}
1167+
1168+
/**
1169+
* Delete a database-backed dynamic queue.
1170+
*
1171+
* @param name Queue name
1172+
* @return true if the queue was deleted, false if it did not exist
1173+
*/
1174+
public boolean deleteQueue(@NonNull String name) {
1175+
return systemDatabase.deleteQueue(name);
1176+
}
11001177
}

transact/src/main/java/dev/dbos/transact/admin/AdminServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ private void deactivate(HttpExchange exchange) throws IOException {
140140
}
141141

142142
private void workflowQueuesMetadata(HttpExchange exchange) throws IOException {
143-
var queues = dbosExecutor.getQueues();
143+
var queues = dbosExecutor.getStaticQueues();
144144
sendMappedJson(exchange, 200, queues);
145145
}
146146

transact/src/main/java/dev/dbos/transact/conductor/Conductor.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import dev.dbos.transact.workflow.ExportedWorkflow;
77
import dev.dbos.transact.workflow.ForkOptions;
88
import dev.dbos.transact.workflow.ListWorkflowsInput;
9+
import dev.dbos.transact.workflow.Queue;
910
import dev.dbos.transact.workflow.StepInfo;
1011
import dev.dbos.transact.workflow.WorkflowHandle;
1112
import dev.dbos.transact.workflow.WorkflowSchedule;
@@ -702,6 +703,7 @@ CompletableFuture<BaseResponse> getResponseAsync(BaseMessage message, WebSocket
702703
case EXPORT_WORKFLOW -> handleExportWorkflow(this, message, ws);
703704
case FORK_WORKFLOW -> handleFork(this, message);
704705
case GET_METRICS -> handleGetMetrics(this, message);
706+
case GET_QUEUE -> handleGetQueue(this, message);
705707
case GET_SCHEDULE -> handleGetSchedule(this, message);
706708
case GET_WORKFLOW_AGGREGATES -> handleGetWorkflowAggregates(this, message);
707709
case GET_WORKFLOW_EVENTS -> handleGetWorkflowEvents(this, message);
@@ -711,6 +713,7 @@ CompletableFuture<BaseResponse> getResponseAsync(BaseMessage message, WebSocket
711713
case IMPORT_WORKFLOW -> handleImportWorkflow(this, message);
712714
case LIST_APPLICATION_VERSIONS -> handleListApplicationVersions(this, message);
713715
case LIST_QUEUED_WORKFLOWS -> handleListQueuedWorkflows(this, message);
716+
case LIST_QUEUES -> handleListQueues(this, message);
714717
case LIST_SCHEDULES -> handleListSchedules(this, message);
715718
case LIST_STEPS -> handleListSteps(this, message);
716719
case LIST_WORKFLOWS -> handleListWorkflows(this, message);
@@ -1371,6 +1374,35 @@ static CompletableFuture<BaseResponse> handleGetSchedule(
13711374
});
13721375
}
13731376

1377+
static CompletableFuture<BaseResponse> handleListQueues(
1378+
Conductor conductor, BaseMessage message) {
1379+
return CompletableFuture.supplyAsync(
1380+
() -> {
1381+
try {
1382+
List<Queue> queues = conductor.systemDatabase.listQueues();
1383+
List<QueueOutput> output = queues.stream().map(QueueOutput::from).toList();
1384+
return new ListQueuesResponse(message, output);
1385+
} catch (Exception e) {
1386+
logger.error("Exception encountered when listing queues", e);
1387+
return new ListQueuesResponse(message, e.getMessage());
1388+
}
1389+
});
1390+
}
1391+
1392+
static CompletableFuture<BaseResponse> handleGetQueue(Conductor conductor, BaseMessage message) {
1393+
return CompletableFuture.supplyAsync(
1394+
() -> {
1395+
GetQueueRequest request = (GetQueueRequest) message;
1396+
try {
1397+
var queue = conductor.systemDatabase.findQueue(request.name);
1398+
return new GetQueueResponse(request, queue.map(QueueOutput::from).orElse(null));
1399+
} catch (Exception e) {
1400+
logger.error("Exception encountered when getting queue {}", request.name, e);
1401+
return new GetQueueResponse(request, e.getMessage());
1402+
}
1403+
});
1404+
}
1405+
13741406
static CompletableFuture<BaseResponse> handlePauseSchedule(
13751407
Conductor conductor, BaseMessage message) {
13761408
return CompletableFuture.supplyAsync(

transact/src/main/java/dev/dbos/transact/conductor/protocol/BaseMessage.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
@JsonSubTypes.Type(value = ExportWorkflowRequest.class, name = "export_workflow"),
2020
@JsonSubTypes.Type(value = ForkWorkflowRequest.class, name = "fork_workflow"),
2121
@JsonSubTypes.Type(value = GetMetricsRequest.class, name = "get_metrics"),
22+
@JsonSubTypes.Type(value = GetQueueRequest.class, name = "get_queue"),
2223
@JsonSubTypes.Type(value = GetScheduleRequest.class, name = "get_schedule"),
2324
@JsonSubTypes.Type(value = GetWorkflowAggregatesRequest.class, name = "get_workflow_aggregates"),
2425
@JsonSubTypes.Type(value = GetWorkflowEventsRequest.class, name = "get_workflow_events"),
@@ -32,6 +33,7 @@
3233
value = ListApplicationVersionsRequest.class,
3334
name = "list_application_versions"),
3435
@JsonSubTypes.Type(value = ListQueuedWorkflowsRequest.class, name = "list_queued_workflows"),
36+
@JsonSubTypes.Type(value = ListQueuesRequest.class, name = "list_queues"),
3537
@JsonSubTypes.Type(value = ListSchedulesRequest.class, name = "list_schedules"),
3638
@JsonSubTypes.Type(value = ListStepsRequest.class, name = "list_steps"),
3739
@JsonSubTypes.Type(value = ListWorkflowsRequest.class, name = "list_workflows"),
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package dev.dbos.transact.conductor.protocol;
2+
3+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
4+
5+
@JsonIgnoreProperties(ignoreUnknown = true)
6+
public class GetQueueRequest extends BaseMessage {
7+
public String name;
8+
9+
public GetQueueRequest() {}
10+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package dev.dbos.transact.conductor.protocol;
2+
3+
public class GetQueueResponse extends BaseResponse {
4+
public QueueOutput output;
5+
6+
public GetQueueResponse() {}
7+
8+
public GetQueueResponse(BaseMessage message, QueueOutput output) {
9+
super(message.type, message.request_id);
10+
this.output = output;
11+
}
12+
13+
public GetQueueResponse(BaseMessage message, String errorMessage) {
14+
super(message.type, message.request_id, errorMessage);
15+
this.output = null;
16+
}
17+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package dev.dbos.transact.conductor.protocol;
2+
3+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
4+
5+
@JsonIgnoreProperties(ignoreUnknown = true)
6+
public class ListQueuesRequest extends BaseMessage {
7+
public ListQueuesRequest() {}
8+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package dev.dbos.transact.conductor.protocol;
2+
3+
import java.util.Collections;
4+
import java.util.List;
5+
6+
public class ListQueuesResponse extends BaseResponse {
7+
public List<QueueOutput> output;
8+
9+
public ListQueuesResponse() {}
10+
11+
public ListQueuesResponse(BaseMessage message, List<QueueOutput> output) {
12+
super(message.type, message.request_id);
13+
this.output = output;
14+
}
15+
16+
public ListQueuesResponse(BaseMessage message, String errorMessage) {
17+
super(message.type, message.request_id, errorMessage);
18+
this.output = Collections.emptyList();
19+
}
20+
}

transact/src/main/java/dev/dbos/transact/conductor/protocol/MessageType.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ public enum MessageType {
1010
EXPORT_WORKFLOW("export_workflow"),
1111
FORK_WORKFLOW("fork_workflow"),
1212
GET_METRICS("get_metrics"),
13+
GET_QUEUE("get_queue"),
1314
GET_SCHEDULE("get_schedule"),
1415
GET_WORKFLOW_AGGREGATES("get_workflow_aggregates"),
1516
GET_WORKFLOW_EVENTS("get_workflow_events"),
@@ -19,6 +20,7 @@ public enum MessageType {
1920
IMPORT_WORKFLOW("import_workflow"),
2021
LIST_APPLICATION_VERSIONS("list_application_versions"),
2122
LIST_QUEUED_WORKFLOWS("list_queued_workflows"),
23+
LIST_QUEUES("list_queues"),
2224
LIST_SCHEDULES("list_schedules"),
2325
LIST_STEPS("list_steps"),
2426
LIST_WORKFLOWS("list_workflows"),

0 commit comments

Comments
 (0)