Add Debouncer — coalesce rapid workflow calls into one execution#390
Conversation
Implements a debounce mechanism for DBOS workflows analogous to dbos-transact-py _debouncer.py. Multiple calls with the same key within a period are collapsed into a single user-workflow execution that runs with the most recently supplied arguments. Architecture: - DebouncerServiceImpl: internal @workflow that runs a recv-loop, absorbing messages until the debounce period times out or the absolute debounceTimeout elapses, then starts the user workflow. - Debouncer<R>: public fluent API. Enqueues the service workflow on _dbos_internal_queue with a deduplicationId derived from (workflowName, debounceKey). On DBOSQueueDuplicatedException, forwards a message to the running debouncer and waits for an ack. - DBOSExecutor.captureInvocation(): extracted from startWorkflow so Debouncer can capture a lambda's workflow call without executing it. - Auto-registration of DebouncerService in DBOS constructor so users need no boilerplate setup. - Internal system workflows filtered from getRegisteredWorkflows / getRegisteredWorkflowInstances to keep public counts clean. Usage: var handle = dbos.<String>debouncer() .withDebounceTimeout(Duration.ofMinutes(5)) .debounce("key", Duration.ofSeconds(2), () -> svc.process(arg)); String result = handle.getResult(); Tests: 6 integration tests via Testcontainers Postgres covering single-call, multi-call coalescing, absolute timeout, independent keys, concurrent callers, and queue-based user workflow.
Bug 1: userWorkflowId and messageId were generated as UUID.randomUUID() outside any durable step. When debounce() is called from inside a workflow, these values differ on every replay — the returned handle points to a nonexistent workflow and the ack getEvent waits on the wrong key forever. Fix: wrap UUID generation in a runStep when called from a workflow context. Bug 2: Retrieving the existing debouncer's userWorkflowId via status.input()[1] instanceof DebouncerContextOptions always fails on replay. Java records are implicitly final; DBOSJavaSerializer uses NON_FINAL DefaultTyping, so no @Class type metadata is written for them. On deserialisation from Object.class the element comes back as LinkedHashMap, not as DebouncerContextOptions, causing an IllegalStateException. Fix: publish userWorkflowId as a named event (DEBOUNCER_CHILD_ID_KEY) at the start of the debouncer-workflow; callers read it via getEvent instead.
Object[] args round-trip through dbos.send/recv as generic JSON types: long 5L serialises to JSON 5 and back to Integer(5) when the target is Object.class, causing IllegalArgumentException when the method expects a primitive long. Adds JsonUtility.coerceArguments() call before startRegisteredWorkflow, mirroring the coercion already applied in executeWorkflowById (line 1344). Adds numericArgsRoundTripCorrectly test that exercises long/double parameters through the full debounce + coalesce path.
lookupExistingDebouncerId previously called listWorkflows and iterated all active debouncer entries in Java to find the one matching the deduplication id. When called from inside a workflow this result was also serialised as a step, making it a potential OOM bomb under load. Add WorkflowDAO.findWorkflowIdByDeduplicationId that issues a direct point-lookup on the UNIQUE (queue_name, deduplication_id) index: SELECT workflow_uuid FROM workflow_status WHERE queue_name = ? AND deduplication_id = ? Expose through SystemDatabase → DBOSExecutor → DBOSIntegration so Debouncer.lookupExistingDebouncerId becomes a single delegation call.
Two review findings investigated: Reviewed bug: "SQL without status filter -> livelock" Finding: NOT real. updateWorkflowOutcome clears deduplication_id to NULL on completion (WorkflowDAO line 329). PostgreSQL UNIQUE constraints treat NULL != NULL, so the unique slot is freed and a new enqueue succeeds without conflict. findWorkflowIdByDeduplicationId also returns null for completed debouncers since the WHERE deduplication_id = ? predicate never matches NULL. Added regression test reDebouncAfterWindowCloses that confirms two sequential debounce windows on the same key both execute correctly. Reviewed bug: "lookupExistingDebouncerId not a durable step" Finding: REAL when debounce() is called from inside a workflow. If the parent workflow crashes after DBOSQueueDuplicatedException but before the first step (send) is recorded, recovery would re-execute lookupExistingDebouncerId against the live DB rather than replaying a recorded result. This can produce a different debouncer id and break the determinism of the subsequent send and getEvent steps. Python wraps the equivalent call in call_function_as_step. Fix: when DBOS.inWorkflow() && !DBOS.inStep(), record the lookup result as a durable step "lookupDebouncer" so recovery replays it deterministically.
- Propagate caller workflow context (priority, appVersion, deduplicationId, timeout) to the user workflow via DebouncerContextOptions. Add these fields to DBOSContext, populate from ExecutionOptions. - Change debouncerWorkflow return type String → void: return value was unused, Python returns None.
- Guard send with messageSent flag: only one message per debounce() call - Replace unreachable childIdOpt.isEmpty() continue with IllegalStateException
…rkflows - Add debounceVoid, absoluteTimeoutUsesLatestArgs, priorityPropagatedFromCallerContext tests - executeWorkflowById now restores priority/appVersion from workflow_status so DBOSContext.currentPriority() is non-null inside dequeued workflows - Skip queue-option validation for dequeued/recovered workflows - DebouncerServiceImpl: skip priority/deduplicationId when no user queue
|
Hey @easmith, thanks for the contribution! I was traveling today so I haven't had a chance to review this, but I kicked off the GH actions run. I'll take a look tomorrow. (USA Pacific time zone) |
|
getting close. Still need to use
|
Add recordErrorForUnstartedWorkflow so awaiting handles fail fast; validate registration up front in Debouncer.
…orkflow/inStep checks
DebouncerClientTest.java:111 - already has that test |
|
Fixed the latest review comments and added the missing tests. |
|
I think the remaining asks from me are:
|
…nt integration wrappers - DBOS constructor registers internal workflow via workflowRegistry directly - InternalWorkflows uses Supplier<DBOSExecutor> for executor-level calls - Remove unused Debouncer fail-fast check (error surfaced in debouncerWorkflow)
|
Thanks for HIGH quality review. Ready to go =) |
I made a small change to I'm signed off but I want @kraftp to also take a look since he wrote the original debouncer code. |
| * <p>Not part of the public API — the debouncer infrastructure consumes this directly. | ||
| */ | ||
| public record DebouncerMessage( | ||
| @NonNull String messageId, @NonNull Object[] args, @NonNull Duration debouncePeriod) {} |
There was a problem hiding this comment.
How does typing work here? As far as I can tell, all tests have primitive types, what happens if a more complex type (user-defined object?) is routed through this?
There was a problem hiding this comment.
DebouncerMessage is used as a parameter for debouncerWorkflow as well as the message sent to debouncerWorkflow. Both of those code paths use the standard DBOSSerializer which is tested in JavaSerializerTest.
kraftp
left a comment
There was a problem hiding this comment.
This looks good to me, other than one comment about typing
Implements a debounce mechanism for DBOS workflows analogous to dbos-transact-py _debouncer.py. Multiple calls with the same key within a period are collapsed into a single user-workflow execution that runs with the most recently supplied arguments.
Architecture:
Usage:
Tests: 6 integration tests via Testcontainers Postgres covering single-call, multi-call coalescing, absolute timeout, independent keys, concurrent callers, and queue-based user workflow.