Skip to content

Dyanmic Queue Support#391

Merged
devhawk merged 29 commits into
mainfrom
devhawk/dynamic-queues
May 26, 2026
Merged

Dyanmic Queue Support#391
devhawk merged 29 commits into
mainfrom
devhawk/dynamic-queues

Conversation

@devhawk
Copy link
Copy Markdown
Collaborator

@devhawk devhawk commented May 22, 2026

PR Summary: Dynamic Queues (devhawk/dynamic-queues)

New public API (DBOS and DBOSClient)

  • registerQueue(name, QueueOptions) — creates or updates a queue persisted in the database
  • registerQueue(name, QueueOptions, QueueConflictResolution) — same with explicit conflict handling
  • updateQueue(name, QueueOptions) — partial update; only present fields are written, absent fields left unchanged
  • findQueue(name) — look up a queue by name
  • listQueues() — list all database-backed queues
  • deleteQueue(name) — remove a queue from the database

New types

  • QueueOptions — immutable record with a Field<T> for each queue property, supporting three states: absent (don't touch), present-with-value, or present-with-null (clear). Fluent builder API via set*() static factories and and*() chainers.
  • QueueConflictResolution — enum controlling registerQueue behavior: ALWAYS_UPDATE, NEVER_UPDATE, or UPDATE_IF_LATEST_VERSION (the last is only available on DBOS, not DBOSClient, since clients have no concept of app version).
  • Field<T> — sealed interface (Absent / Present) for three-state optional values used by QueueOptions.

Queue record changes

  • Added pollingInterval field (defaults to 1 second) with validation
  • withRateLimit(int, double) replaced by withRateLimit(int, long, TimeUnit) for type safety

Runtime: dynamic queue polling (QueueService)

  • New pollDynamicQueues() supervisor runs every second, reads all queues from the database, and starts/stops QueueListenerTask instances accordingly
  • Static queues take precedence: if a DB queue shares a name with a static queue, the static config wins and a warning is logged
  • listenQueues filter respected for dynamic queues too

SchedulerService refactor

  • pollWorkflowSchedulesImpl inlined; scheduler loop simplified

Conductor protocol

  • New get_queue / list_queues request+response message types and QueueOutput DTO, so the conductor can serve queue info to clients

Schema / DAO

  • QueuesDAO — full implementation: upsertQueue, updateQueue, findQueue, listQueues, deleteQueue, getQueuePartitions
  • Migration updated to use INT4 (explicitly 32-bit) for concurrency, worker_concurrency, and rate_limit_max columns — fixes a CRDB incompatibility where INTEGER defaults to 64-bit int8

fixes #372

@devhawk devhawk requested review from kraftp, maxdml and qianl15 May 22, 2026 00:23
Comment thread transact/src/main/java/dev/dbos/transact/DBOS.java
@devhawk devhawk marked this pull request as ready for review May 22, 2026 18:32
@devhawk devhawk requested a review from Copilot May 22, 2026 18:53
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds database-backed “dynamic queues” that can be created/updated at runtime and discovered by executors, including new CRUD APIs on DBOS/DBOSClient, persisted schema/DAO support, conductor protocol messages for queue metadata, and runtime polling to start/stop listeners for DB-defined queues.

Changes:

  • Introduces QueueOptions + Field<T> three-state update model and QueueConflictResolution to control registerQueue semantics.
  • Implements DB persistence and CRUD for queues (queues table + QueuesDAO + SystemDatabase APIs) and exposes queue info via conductor (get_queue / list_queues).
  • Updates runtime services to support dynamic queue listener lifecycle and refactors scheduler loop.

Reviewed changes

Copilot reviewed 28 out of 28 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
transact/src/test/java/dev/dbos/transact/utils/PgContainer.java Truncates dbos.queues between pooled DB test runs.
transact/src/test/java/dev/dbos/transact/queue/QueuesTest.java Updates limiter test to align with new rate-limit API usage.
transact/src/test/java/dev/dbos/transact/queue/DynamicQueuesTest.java Adds integration tests for dynamic queue CRUD + execution.
transact/src/test/java/dev/dbos/transact/database/SystemDatabaseTest.java Adds SystemDatabase-level queue CRUD/partial-update tests.
transact/src/test/java/dev/dbos/transact/conductor/ConductorTest.java Adds conductor protocol tests for get_queue / list_queues.
transact/src/test/java/dev/dbos/transact/client/ClientQueueTest.java Adds DBOSClient tests for dynamic queue APIs/conflict modes.
transact/src/test/java/dev/dbos/transact/admin/AdminServerTest.java Updates admin queue metadata test to use static queues API and new rate-limit signature.
transact/src/main/java/dev/dbos/transact/workflow/QueueOptions.java New immutable options record with three-state fields and fluent builders.
transact/src/main/java/dev/dbos/transact/workflow/QueueConflictResolution.java New enum controlling registerQueue behavior on conflicts.
transact/src/main/java/dev/dbos/transact/workflow/Queue.java Adds pollingInterval, validation, and replaces rate-limit overload with TimeUnit-based method.
transact/src/main/java/dev/dbos/transact/workflow/Field.java New sealed three-state field abstraction used by QueueOptions.
transact/src/main/java/dev/dbos/transact/migrations/MigrationManager.java Adjusts queue schema column types to INT4 for CRDB compatibility.
transact/src/main/java/dev/dbos/transact/execution/SchedulerService.java Inlines scheduler poll implementation and switches to findQueue validation.
transact/src/main/java/dev/dbos/transact/execution/QueueService.java Adds dynamic queue supervisor polling + dynamic listener lifecycle.
transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java Adds dynamic queue operations and splits “static queue” lookup from DB-backed lookup.
transact/src/main/java/dev/dbos/transact/DBOSClient.java Exposes dynamic queue CRUD APIs to clients (DB-backed).
transact/src/main/java/dev/dbos/transact/DBOS.java Exposes dynamic queue CRUD APIs on the runtime and narrows getQueue() to static queues.
transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java Adds queue CRUD methods delegating to QueuesDAO.
transact/src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java Implements queue upsert/update/find/list/delete + DTO conversions.
transact/src/main/java/dev/dbos/transact/conductor/protocol/QueueOutput.java Adds conductor DTO for queue metadata serialization.
transact/src/main/java/dev/dbos/transact/conductor/protocol/MessageType.java Adds GET_QUEUE and LIST_QUEUES message types.
transact/src/main/java/dev/dbos/transact/conductor/protocol/ListQueuesResponse.java Adds response wrapper for list_queues.
transact/src/main/java/dev/dbos/transact/conductor/protocol/ListQueuesRequest.java Adds request type for list_queues.
transact/src/main/java/dev/dbos/transact/conductor/protocol/GetQueueResponse.java Adds response wrapper for get_queue.
transact/src/main/java/dev/dbos/transact/conductor/protocol/GetQueueRequest.java Adds request type for get_queue.
transact/src/main/java/dev/dbos/transact/conductor/protocol/BaseMessage.java Registers new request subtypes for polymorphic decoding.
transact/src/main/java/dev/dbos/transact/conductor/Conductor.java Implements handlers for get_queue / list_queues.
transact/src/main/java/dev/dbos/transact/admin/AdminServer.java Updates admin endpoint to return static queues only.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread transact/src/main/java/dev/dbos/transact/workflow/Field.java Outdated
Comment thread transact/src/main/java/dev/dbos/transact/execution/QueueService.java Outdated
Comment thread transact/src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java Outdated
Comment thread transact/src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java Outdated
Comment thread transact/src/test/java/dev/dbos/transact/queue/DynamicQueuesTest.java Outdated
@devhawk devhawk requested a review from kraftp May 22, 2026 20:21
Comment thread transact/src/test/java/dev/dbos/transact/queue/QueuesTest.java
@devhawk devhawk force-pushed the devhawk/dynamic-queues branch from 7ceb686 to 73340cd Compare May 26, 2026 17:10
@devhawk devhawk requested a review from kraftp May 26, 2026 17:11
Comment thread transact/src/test/java/dev/dbos/transact/queue/QueuesTest.java Outdated
Copy link
Copy Markdown
Member

@kraftp kraftp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, modulo one last test we need to add

@devhawk
Copy link
Copy Markdown
Collaborator Author

devhawk commented May 26, 2026

Looks good, modulo one last test we need to add

The test existed, but dynamic queue tests were spread across two files. I combined them all into the DynamicQueuesTest class. The test in question was testListenQueue

@devhawk devhawk merged commit 9ddfe70 into main May 26, 2026
17 checks passed
@devhawk devhawk deleted the devhawk/dynamic-queues branch May 26, 2026 19:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Dynamic Queues

3 participants