Dyanmic Queue Support#391
Conversation
There was a problem hiding this comment.
Pull request overview
Adds database-backed “dynamic queues” that can be created/updated at runtime and discovered by executors, including new CRUD APIs on DBOS/DBOSClient, persisted schema/DAO support, conductor protocol messages for queue metadata, and runtime polling to start/stop listeners for DB-defined queues.
Changes:
- Introduces
QueueOptions+Field<T>three-state update model andQueueConflictResolutionto controlregisterQueuesemantics. - Implements DB persistence and CRUD for queues (
queuestable +QueuesDAO+SystemDatabaseAPIs) and exposes queue info via conductor (get_queue/list_queues). - Updates runtime services to support dynamic queue listener lifecycle and refactors scheduler loop.
Reviewed changes
Copilot reviewed 28 out of 28 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| transact/src/test/java/dev/dbos/transact/utils/PgContainer.java | Truncates dbos.queues between pooled DB test runs. |
| transact/src/test/java/dev/dbos/transact/queue/QueuesTest.java | Updates limiter test to align with new rate-limit API usage. |
| transact/src/test/java/dev/dbos/transact/queue/DynamicQueuesTest.java | Adds integration tests for dynamic queue CRUD + execution. |
| transact/src/test/java/dev/dbos/transact/database/SystemDatabaseTest.java | Adds SystemDatabase-level queue CRUD/partial-update tests. |
| transact/src/test/java/dev/dbos/transact/conductor/ConductorTest.java | Adds conductor protocol tests for get_queue / list_queues. |
| transact/src/test/java/dev/dbos/transact/client/ClientQueueTest.java | Adds DBOSClient tests for dynamic queue APIs/conflict modes. |
| transact/src/test/java/dev/dbos/transact/admin/AdminServerTest.java | Updates admin queue metadata test to use static queues API and new rate-limit signature. |
| transact/src/main/java/dev/dbos/transact/workflow/QueueOptions.java | New immutable options record with three-state fields and fluent builders. |
| transact/src/main/java/dev/dbos/transact/workflow/QueueConflictResolution.java | New enum controlling registerQueue behavior on conflicts. |
| transact/src/main/java/dev/dbos/transact/workflow/Queue.java | Adds pollingInterval, validation, and replaces rate-limit overload with TimeUnit-based method. |
| transact/src/main/java/dev/dbos/transact/workflow/Field.java | New sealed three-state field abstraction used by QueueOptions. |
| transact/src/main/java/dev/dbos/transact/migrations/MigrationManager.java | Adjusts queue schema column types to INT4 for CRDB compatibility. |
| transact/src/main/java/dev/dbos/transact/execution/SchedulerService.java | Inlines scheduler poll implementation and switches to findQueue validation. |
| transact/src/main/java/dev/dbos/transact/execution/QueueService.java | Adds dynamic queue supervisor polling + dynamic listener lifecycle. |
| transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java | Adds dynamic queue operations and splits “static queue” lookup from DB-backed lookup. |
| transact/src/main/java/dev/dbos/transact/DBOSClient.java | Exposes dynamic queue CRUD APIs to clients (DB-backed). |
| transact/src/main/java/dev/dbos/transact/DBOS.java | Exposes dynamic queue CRUD APIs on the runtime and narrows getQueue() to static queues. |
| transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java | Adds queue CRUD methods delegating to QueuesDAO. |
| transact/src/main/java/dev/dbos/transact/database/dao/QueuesDAO.java | Implements queue upsert/update/find/list/delete + DTO conversions. |
| transact/src/main/java/dev/dbos/transact/conductor/protocol/QueueOutput.java | Adds conductor DTO for queue metadata serialization. |
| transact/src/main/java/dev/dbos/transact/conductor/protocol/MessageType.java | Adds GET_QUEUE and LIST_QUEUES message types. |
| transact/src/main/java/dev/dbos/transact/conductor/protocol/ListQueuesResponse.java | Adds response wrapper for list_queues. |
| transact/src/main/java/dev/dbos/transact/conductor/protocol/ListQueuesRequest.java | Adds request type for list_queues. |
| transact/src/main/java/dev/dbos/transact/conductor/protocol/GetQueueResponse.java | Adds response wrapper for get_queue. |
| transact/src/main/java/dev/dbos/transact/conductor/protocol/GetQueueRequest.java | Adds request type for get_queue. |
| transact/src/main/java/dev/dbos/transact/conductor/protocol/BaseMessage.java | Registers new request subtypes for polymorphic decoding. |
| transact/src/main/java/dev/dbos/transact/conductor/Conductor.java | Implements handlers for get_queue / list_queues. |
| transact/src/main/java/dev/dbos/transact/admin/AdminServer.java | Updates admin endpoint to return static queues only. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
7ceb686 to
73340cd
Compare
kraftp
left a comment
There was a problem hiding this comment.
Looks good, modulo one last test we need to add
The test existed, but dynamic queue tests were spread across two files. I combined them all into the DynamicQueuesTest class. The test in question was testListenQueue |
PR Summary: Dynamic Queues (
devhawk/dynamic-queues)New public API (
DBOSandDBOSClient)registerQueue(name, QueueOptions)— creates or updates a queue persisted in the databaseregisterQueue(name, QueueOptions, QueueConflictResolution)— same with explicit conflict handlingupdateQueue(name, QueueOptions)— partial update; only present fields are written, absent fields left unchangedfindQueue(name)— look up a queue by namelistQueues()— list all database-backed queuesdeleteQueue(name)— remove a queue from the databaseNew types
QueueOptions— immutable record with aField<T>for each queue property, supporting three states: absent (don't touch), present-with-value, or present-with-null (clear). Fluent builder API viaset*()static factories andand*()chainers.QueueConflictResolution— enum controllingregisterQueuebehavior:ALWAYS_UPDATE,NEVER_UPDATE, orUPDATE_IF_LATEST_VERSION(the last is only available onDBOS, notDBOSClient, since clients have no concept of app version).Field<T>— sealed interface (Absent/Present) for three-state optional values used byQueueOptions.Queuerecord changespollingIntervalfield (defaults to 1 second) with validationwithRateLimit(int, double)replaced bywithRateLimit(int, long, TimeUnit)for type safetyRuntime: dynamic queue polling (
QueueService)pollDynamicQueues()supervisor runs every second, reads all queues from the database, and starts/stopsQueueListenerTaskinstances accordinglylistenQueuesfilter respected for dynamic queues tooSchedulerServicerefactorpollWorkflowSchedulesImplinlined; scheduler loop simplifiedConductor protocol
get_queue/list_queuesrequest+response message types andQueueOutputDTO, so the conductor can serve queue info to clientsSchema / DAO
QueuesDAO— full implementation:upsertQueue,updateQueue,findQueue,listQueues,deleteQueue,getQueuePartitionsINT4(explicitly 32-bit) forconcurrency,worker_concurrency, andrate_limit_maxcolumns — fixes a CRDB incompatibility whereINTEGERdefaults to 64-bitint8fixes #372