Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
2d7953c
added initial queues DAO methods
devhawk May 20, 2026
c4c5348
updateQueue
devhawk May 20, 2026
24e044c
dynamic queues conductor protocol
devhawk May 20, 2026
be06865
WIP
devhawk May 21, 2026
6cd677f
spotless
devhawk May 21, 2026
3e118ba
wip
devhawk May 21, 2026
1bf58d0
QueueConflictResolution
devhawk May 21, 2026
846445f
fix tests
devhawk May 21, 2026
c31c452
use queue polling interval
devhawk May 21, 2026
755a9d9
inline pollWorkflowSchedulesImpl
devhawk May 21, 2026
ed188a5
pollDynamicQueues + tests
devhawk May 21, 2026
5021923
spotless
devhawk May 21, 2026
1c4d927
cleanup
devhawk May 21, 2026
fe9658f
spotless
devhawk May 21, 2026
74d5f2a
moar cleanup
devhawk May 22, 2026
f3de1a0
add more tests + fix truncateDbosTables
devhawk May 22, 2026
3a55834
fix queues int columns for CRDB
devhawk May 22, 2026
529fa46
Merge branch 'main' into devhawk/dynamic-queues
devhawk May 22, 2026
dedd622
copilot feedback
devhawk May 22, 2026
b27edd6
Merge branch 'main' into devhawk/dynamic-queues
devhawk May 22, 2026
0ed1106
Merge branch 'main' into devhawk/dynamic-queues
devhawk May 22, 2026
3330cd7
Merge branch 'main' into devhawk/dynamic-queues
devhawk May 22, 2026
0e51c0b
more tests
devhawk May 24, 2026
e1cc7dc
in memory cache of dynamic queues for validation
devhawk May 26, 2026
73340cd
add tests that dynamic queue updates are seen by queue service
devhawk May 26, 2026
d4819de
fix flaky test
devhawk May 26, 2026
1a611f2
Merge branch 'main' into devhawk/dynamic-queues
devhawk May 26, 2026
a4ac8a1
merge all dynamic queue tests into a single class
devhawk May 26, 2026
05e925b
Merge branch 'main' into devhawk/dynamic-queues
devhawk May 26, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 73 additions & 1 deletion transact/src/main/java/dev/dbos/transact/DBOS.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import dev.dbos.transact.workflow.ForkOptions;
import dev.dbos.transact.workflow.ListWorkflowsInput;
import dev.dbos.transact.workflow.Queue;
import dev.dbos.transact.workflow.QueueConflictResolution;
import dev.dbos.transact.workflow.QueueOptions;
import dev.dbos.transact.workflow.ScheduleStatus;
import dev.dbos.transact.workflow.SerializationStrategy;
import dev.dbos.transact.workflow.Step;
Expand Down Expand Up @@ -164,6 +166,76 @@ public void registerQueues(@NonNull Queue... queues) {
}
}

/**
* Register a database-backed dynamic queue. Must be called after launch. Queue configuration can
* be updated at runtime via {@link #updateQueue(String, QueueOptions)}.
*
* <p>Uses {@link QueueConflictResolution#UPDATE_IF_LATEST_VERSION} by default: the existing
* configuration is overwritten only if this executor is running the latest application version.
*
* @param name Queue name
* @param options Initial configuration options
*/
public void registerQueue(@NonNull String name, @NonNull QueueOptions options) {
Comment thread
devhawk marked this conversation as resolved.
ensureLaunched("registerQueue")
.registerDynamicQueue(name, options, QueueConflictResolution.UPDATE_IF_LATEST_VERSION);
}

/**
* Register a database-backed dynamic queue. Must be called after launch. Queue configuration can
* be updated at runtime via {@link #updateQueue(String, QueueOptions)}.
*
* @param name Queue name
* @param options Initial configuration options
* @param onConflict How to handle an existing queue with the same name
*/
public void registerQueue(
@NonNull String name,
@NonNull QueueOptions options,
@NonNull QueueConflictResolution onConflict) {
ensureLaunched("registerQueue").registerDynamicQueue(name, options, onConflict);
}

/**
* Update the configuration of a database-backed dynamic queue. Must be called after launch. Only
* fields that are present in {@code options} are written; absent fields are left unchanged.
*
* @param name Queue name
* @param options Fields to update
*/
public void updateQueue(@NonNull String name, @NonNull QueueOptions options) {
ensureLaunched("updateQueue").updateDynamicQueue(name, options);
}

/**
* Retrieve a database-backed dynamic queue by name. Must be called after launch.
*
* @param name Queue name
* @return the queue if it exists in the database, or empty
*/
public @NonNull Optional<Queue> findQueue(@NonNull String name) {
return ensureLaunched("findQueue").findDynamicQueue(name);
}

/**
* Delete a database-backed dynamic queue. Must be called after launch.
*
* @param name Queue name
* @return true if the queue was deleted, false if it did not exist
*/
public boolean deleteQueue(@NonNull String name) {
return ensureLaunched("deleteQueue").deleteDynamicQueue(name);
}

/**
* List all database-backed dynamic queues. Must be called after launch.
*
* @return list of all queues currently registered in the database
*/
public @NonNull List<Queue> listQueues() {
return ensureLaunched("listQueues").listDynamicQueues();
}

/**
* Register all workflows and steps in the provided class instance
*
Expand Down Expand Up @@ -311,7 +383,7 @@ private DBOSExecutor ensureLaunched(String caller) {
* @return Optional containing the queue definition for given `queueName`, or empty if not found
*/
public @NonNull Optional<Queue> getQueue(@NonNull String queueName) {
return ensureLaunched("getQueue").getQueue(queueName);
return ensureLaunched("getQueue").findStaticQueue(queueName);
}

/**
Expand Down
77 changes: 77 additions & 0 deletions transact/src/main/java/dev/dbos/transact/DBOSClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
import dev.dbos.transact.json.SerializationUtil;
import dev.dbos.transact.workflow.ForkOptions;
import dev.dbos.transact.workflow.ListWorkflowsInput;
import dev.dbos.transact.workflow.Queue;
import dev.dbos.transact.workflow.QueueConflictResolution;
import dev.dbos.transact.workflow.QueueOptions;
import dev.dbos.transact.workflow.ScheduleStatus;
import dev.dbos.transact.workflow.SerializationStrategy;
import dev.dbos.transact.workflow.StepInfo;
Expand Down Expand Up @@ -1097,4 +1100,78 @@ public void setWorkflowDelay(@NonNull String workflowId, @NonNull Instant delayU
Objects.requireNonNull(delayUntil, "delayUntil must not be null"));
systemDatabase.setWorkflowDelay(workflowId, wfDelay);
}

/**
* Register a database-backed dynamic queue. Uses {@link QueueConflictResolution#ALWAYS_UPDATE} by
* default.
*
* @param name Queue name
* @param options Configuration options
*/
public void registerQueue(@NonNull String name, @NonNull QueueOptions options) {
registerQueue(name, options, QueueConflictResolution.ALWAYS_UPDATE);
}

/**
* Register a database-backed dynamic queue.
*
* <p>{@link QueueConflictResolution#UPDATE_IF_LATEST_VERSION} is not supported by {@code
* DBOSClient} because clients are not associated with an application version. Use {@link
* QueueConflictResolution#ALWAYS_UPDATE} or {@link QueueConflictResolution#NEVER_UPDATE}.
*
* @param name Queue name
* @param options Configuration options
* @param onConflict How to handle an existing queue with the same name
*/
public void registerQueue(
@NonNull String name,
@NonNull QueueOptions options,
@NonNull QueueConflictResolution onConflict) {
if (onConflict == QueueConflictResolution.UPDATE_IF_LATEST_VERSION) {
throw new IllegalArgumentException(
"DBOSClient.registerQueue does not support UPDATE_IF_LATEST_VERSION because clients are"
+ " not associated with an application version. Use ALWAYS_UPDATE or NEVER_UPDATE.");
}
systemDatabase.upsertQueue(name, options, onConflict == QueueConflictResolution.ALWAYS_UPDATE);
}

/**
* Update the configuration of a database-backed dynamic queue. Only fields that are present in
* {@code options} are written; absent fields are left unchanged.
*
* @param name Queue name
* @param options Fields to update
*/
public void updateQueue(@NonNull String name, @NonNull QueueOptions options) {
systemDatabase.updateQueue(name, options);
}

/**
* Retrieve a database-backed dynamic queue by name.
*
* @param name Queue name
* @return the queue if it exists in the database, or empty
*/
public @NonNull Optional<Queue> findQueue(@NonNull String name) {
return systemDatabase.findQueue(name);
}

/**
* List all database-backed dynamic queues.
*
* @return list of all queues currently registered in the database
*/
public @NonNull List<Queue> listQueues() {
return systemDatabase.listQueues();
}

/**
* Delete a database-backed dynamic queue.
*
* @param name Queue name
* @return true if the queue was deleted, false if it did not exist
*/
public boolean deleteQueue(@NonNull String name) {
return systemDatabase.deleteQueue(name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private void deactivate(HttpExchange exchange) throws IOException {
}

private void workflowQueuesMetadata(HttpExchange exchange) throws IOException {
var queues = dbosExecutor.getQueues();
var queues = dbosExecutor.getStaticQueues();
sendMappedJson(exchange, 200, queues);
}

Expand Down
32 changes: 32 additions & 0 deletions transact/src/main/java/dev/dbos/transact/conductor/Conductor.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import dev.dbos.transact.workflow.ExportedWorkflow;
import dev.dbos.transact.workflow.ForkOptions;
import dev.dbos.transact.workflow.ListWorkflowsInput;
import dev.dbos.transact.workflow.Queue;
import dev.dbos.transact.workflow.StepInfo;
import dev.dbos.transact.workflow.WorkflowHandle;
import dev.dbos.transact.workflow.WorkflowSchedule;
Expand Down Expand Up @@ -702,6 +703,7 @@ CompletableFuture<BaseResponse> getResponseAsync(BaseMessage message, WebSocket
case EXPORT_WORKFLOW -> handleExportWorkflow(this, message, ws);
case FORK_WORKFLOW -> handleFork(this, message);
case GET_METRICS -> handleGetMetrics(this, message);
case GET_QUEUE -> handleGetQueue(this, message);
case GET_SCHEDULE -> handleGetSchedule(this, message);
case GET_WORKFLOW_AGGREGATES -> handleGetWorkflowAggregates(this, message);
case GET_WORKFLOW_EVENTS -> handleGetWorkflowEvents(this, message);
Expand All @@ -711,6 +713,7 @@ CompletableFuture<BaseResponse> getResponseAsync(BaseMessage message, WebSocket
case IMPORT_WORKFLOW -> handleImportWorkflow(this, message);
case LIST_APPLICATION_VERSIONS -> handleListApplicationVersions(this, message);
case LIST_QUEUED_WORKFLOWS -> handleListQueuedWorkflows(this, message);
case LIST_QUEUES -> handleListQueues(this, message);
case LIST_SCHEDULES -> handleListSchedules(this, message);
case LIST_STEPS -> handleListSteps(this, message);
case LIST_WORKFLOWS -> handleListWorkflows(this, message);
Expand Down Expand Up @@ -1371,6 +1374,35 @@ static CompletableFuture<BaseResponse> handleGetSchedule(
});
}

static CompletableFuture<BaseResponse> handleListQueues(
Conductor conductor, BaseMessage message) {
return CompletableFuture.supplyAsync(
() -> {
try {
List<Queue> queues = conductor.systemDatabase.listQueues();
List<QueueOutput> output = queues.stream().map(QueueOutput::from).toList();
return new ListQueuesResponse(message, output);
} catch (Exception e) {
logger.error("Exception encountered when listing queues", e);
return new ListQueuesResponse(message, e.getMessage());
}
});
}

static CompletableFuture<BaseResponse> handleGetQueue(Conductor conductor, BaseMessage message) {
return CompletableFuture.supplyAsync(
() -> {
GetQueueRequest request = (GetQueueRequest) message;
try {
var queue = conductor.systemDatabase.findQueue(request.name);
return new GetQueueResponse(request, queue.map(QueueOutput::from).orElse(null));
} catch (Exception e) {
logger.error("Exception encountered when getting queue {}", request.name, e);
return new GetQueueResponse(request, e.getMessage());
}
});
}

static CompletableFuture<BaseResponse> handlePauseSchedule(
Conductor conductor, BaseMessage message) {
return CompletableFuture.supplyAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
@JsonSubTypes.Type(value = ExportWorkflowRequest.class, name = "export_workflow"),
@JsonSubTypes.Type(value = ForkWorkflowRequest.class, name = "fork_workflow"),
@JsonSubTypes.Type(value = GetMetricsRequest.class, name = "get_metrics"),
@JsonSubTypes.Type(value = GetQueueRequest.class, name = "get_queue"),
@JsonSubTypes.Type(value = GetScheduleRequest.class, name = "get_schedule"),
@JsonSubTypes.Type(value = GetWorkflowAggregatesRequest.class, name = "get_workflow_aggregates"),
@JsonSubTypes.Type(value = GetWorkflowEventsRequest.class, name = "get_workflow_events"),
Expand All @@ -32,6 +33,7 @@
value = ListApplicationVersionsRequest.class,
name = "list_application_versions"),
@JsonSubTypes.Type(value = ListQueuedWorkflowsRequest.class, name = "list_queued_workflows"),
@JsonSubTypes.Type(value = ListQueuesRequest.class, name = "list_queues"),
@JsonSubTypes.Type(value = ListSchedulesRequest.class, name = "list_schedules"),
@JsonSubTypes.Type(value = ListStepsRequest.class, name = "list_steps"),
@JsonSubTypes.Type(value = ListWorkflowsRequest.class, name = "list_workflows"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package dev.dbos.transact.conductor.protocol;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;

@JsonIgnoreProperties(ignoreUnknown = true)
public class GetQueueRequest extends BaseMessage {
public String name;

public GetQueueRequest() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package dev.dbos.transact.conductor.protocol;

public class GetQueueResponse extends BaseResponse {
public QueueOutput output;

public GetQueueResponse() {}

public GetQueueResponse(BaseMessage message, QueueOutput output) {
super(message.type, message.request_id);
this.output = output;
}

public GetQueueResponse(BaseMessage message, String errorMessage) {
super(message.type, message.request_id, errorMessage);
this.output = null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package dev.dbos.transact.conductor.protocol;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;

@JsonIgnoreProperties(ignoreUnknown = true)
public class ListQueuesRequest extends BaseMessage {
public ListQueuesRequest() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package dev.dbos.transact.conductor.protocol;

import java.util.Collections;
import java.util.List;

public class ListQueuesResponse extends BaseResponse {
public List<QueueOutput> output;

public ListQueuesResponse() {}

public ListQueuesResponse(BaseMessage message, List<QueueOutput> output) {
super(message.type, message.request_id);
this.output = output;
}

public ListQueuesResponse(BaseMessage message, String errorMessage) {
super(message.type, message.request_id, errorMessage);
this.output = Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public enum MessageType {
EXPORT_WORKFLOW("export_workflow"),
FORK_WORKFLOW("fork_workflow"),
GET_METRICS("get_metrics"),
GET_QUEUE("get_queue"),
GET_SCHEDULE("get_schedule"),
GET_WORKFLOW_AGGREGATES("get_workflow_aggregates"),
GET_WORKFLOW_EVENTS("get_workflow_events"),
Expand All @@ -19,6 +20,7 @@ public enum MessageType {
IMPORT_WORKFLOW("import_workflow"),
LIST_APPLICATION_VERSIONS("list_application_versions"),
LIST_QUEUED_WORKFLOWS("list_queued_workflows"),
LIST_QUEUES("list_queues"),
LIST_SCHEDULES("list_schedules"),
LIST_STEPS("list_steps"),
LIST_WORKFLOWS("list_workflows"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package dev.dbos.transact.conductor.protocol;

import dev.dbos.transact.workflow.Queue;

public record QueueOutput(
String name,
Integer concurrency,
Integer worker_concurrency,
Integer rate_limit_max,
Double rate_limit_period_sec,
boolean priority_enabled,
boolean partition_queue,
double polling_interval_sec) {

public static QueueOutput from(Queue q) {
Queue.RateLimit rl = q.rateLimit();
return new QueueOutput(
q.name(),
q.concurrency(),
q.workerConcurrency(),
rl != null ? rl.limit() : null,
rl != null ? rl.period().toMillis() / 1000.0 : null,
q.priorityEnabled(),
q.partitioningEnabled(),
q.pollingInterval().toMillis() / 1000.0);
}
}
Loading