Skip to content

Commit a8039cf

Browse files
easmithdevhawk
andauthored
Add Debouncer — coalesce rapid workflow calls into one execution (#390)
Implements a debounce mechanism for DBOS workflows analogous to dbos-transact-py _debouncer.py. Multiple calls with the same key within a period are collapsed into a single user-workflow execution that runs with the most recently supplied arguments. Architecture: - DebouncerServiceImpl: internal @workflow that runs a recv-loop, absorbing messages until the debounce period times out or the absolute debounceTimeout elapses, then starts the user workflow. - Debouncer<R>: public fluent API. Enqueues the service workflow on _dbos_internal_queue with a deduplicationId derived from (workflowName, debounceKey). On DBOSQueueDuplicatedException, forwards a message to the running debouncer and waits for an ack. - DBOSExecutor.captureInvocation(): extracted from startWorkflow so Debouncer can capture a lambda's workflow call without executing it. - Auto-registration of DebouncerService in DBOS constructor so users need no boilerplate setup. - Internal system workflows filtered from getRegisteredWorkflows / getRegisteredWorkflowInstances to keep public counts clean. Usage: ```java var handle = dbos.<String>debouncer() .withDebounceTimeout(Duration.ofMinutes(5)) .debounce("key", Duration.ofSeconds(2), () -> svc.process(arg)); String result = handle.getResult(); ``` Tests: 6 integration tests via Testcontainers Postgres covering single-call, multi-call coalescing, absolute timeout, independent keys, concurrent callers, and queue-based user workflow. --------- Co-authored-by: Harry Pierson <harrypierson@hotmail.com> Co-authored-by: Harry Pierson <harry.pierson@dbos.dev>
1 parent caddbbb commit a8039cf

17 files changed

Lines changed: 1783 additions & 55 deletions

transact/src/main/java/dev/dbos/transact/Constants.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,13 @@ public class Constants {
1616

1717
public static final String DBOS_INTERNAL_QUEUE = "_dbos_internal_queue";
1818

19+
public static final String DEBOUNCER_WORKFLOW_NAME = "debouncerWorkflow";
20+
public static final String DEBOUNCER_CLASS_NAME = "DBOS.InternalWorkflows";
21+
public static final String DEBOUNCER_TOPIC = "_dbos_debouncer_topic";
22+
// Event key published by the debouncer-workflow so callers can retrieve the pre-assigned
23+
// user workflow id without relying on Jackson deserialization of workflow inputs.
24+
public static final String DEBOUNCER_CHILD_ID_KEY = "_dbos_debouncer_child_id";
25+
1926
public static final String SYSTEM_JDBC_URL_ENV_VAR = "DBOS_SYSTEM_JDBC_URL";
2027

2128
public static final int DEFAULT_MAX_RECOVERY_ATTEMPTS = 100;

transact/src/main/java/dev/dbos/transact/DBOS.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44
import dev.dbos.transact.context.DBOSContext;
55
import dev.dbos.transact.execution.DBOSExecutor;
66
import dev.dbos.transact.execution.DBOSLifecycleListener;
7+
import dev.dbos.transact.execution.RegisteredWorkflow;
78
import dev.dbos.transact.execution.ThrowingRunnable;
89
import dev.dbos.transact.execution.ThrowingSupplier;
910
import dev.dbos.transact.internal.DBOSIntegration;
1011
import dev.dbos.transact.internal.DBOSInvocationHandler;
1112
import dev.dbos.transact.internal.QueueRegistry;
1213
import dev.dbos.transact.internal.WorkflowRegistry;
1314
import dev.dbos.transact.migrations.MigrationManager;
15+
import dev.dbos.transact.workflow.Debouncer;
1416
import dev.dbos.transact.workflow.ForkOptions;
1517
import dev.dbos.transact.workflow.ListWorkflowsInput;
1618
import dev.dbos.transact.workflow.Queue;
@@ -28,6 +30,7 @@
2830
import dev.dbos.transact.workflow.WorkflowHandle;
2931
import dev.dbos.transact.workflow.WorkflowSchedule;
3032
import dev.dbos.transact.workflow.WorkflowStatus;
33+
import dev.dbos.transact.workflow.internal.InternalWorkflows;
3134

3235
import java.io.IOException;
3336
import java.io.InputStream;
@@ -65,6 +68,7 @@ public class DBOS implements AutoCloseable {
6568
private final DBOSConfig config;
6669
private final AtomicReference<DBOSExecutor> dbosExecutor = new AtomicReference<>();
6770
private final DBOSIntegration integration;
71+
private final RegisteredWorkflow debouncerWorkflow;
6872

6973
private AlertHandler alertHandler;
7074

@@ -87,6 +91,16 @@ public DBOS(@NonNull DBOSConfig config) {
8791
this.integration =
8892
new DBOSIntegration(
8993
this.config, this.workflowRegistry, dbosExecutor::get, this::registerLifecycleListener);
94+
// Register the built-in debouncer service workflow directly (without a proxy) so callers can
95+
// use Debouncer without having to declare and wire the service themselves.
96+
var internalWorkflows = new InternalWorkflows(this, dbosExecutor::get);
97+
workflowRegistry.registerInternalInstance(internalWorkflows);
98+
this.debouncerWorkflow =
99+
workflowRegistry.registerInternalWorkflow(
100+
Constants.DEBOUNCER_WORKFLOW_NAME,
101+
Constants.DEBOUNCER_CLASS_NAME,
102+
internalWorkflows,
103+
InternalWorkflows.debouncerWorkflowMethod());
90104
}
91105

92106
/**
@@ -346,6 +360,8 @@ public void launch() {
346360
new HashSet<>(this.lifecycleRegistry),
347361
workflowRegistry.getWorkflowSnapshot(),
348362
workflowRegistry.getInstanceSnapshot(),
363+
workflowRegistry.getInternalWorkflowSnapshot(),
364+
workflowRegistry.getInternalInstanceSnapshot(),
349365
queueRegistry.getSnapshot(),
350366
alertHandler);
351367
}
@@ -462,6 +478,21 @@ public void sleep(@NonNull Duration duration) {
462478
return startWorkflow(runnable, null);
463479
}
464480

481+
/**
482+
* Build a {@link Debouncer} that consolidates a series of calls on the same key into one
483+
* execution of the targeted workflow using the most recent arguments.
484+
*
485+
* <p>The returned debouncer is immutable; configuration helpers like {@link
486+
* Debouncer#withQueue(String)} and {@link Debouncer#withDebounceTimeout(java.time.Duration)}
487+
* return new instances.
488+
*
489+
* @param <R> the return type of the debounced workflow (used only for type inference)
490+
* @return a fresh debouncer bound to this DBOS instance
491+
*/
492+
public <R> @NonNull Debouncer<R> debouncer() {
493+
return new Debouncer<>(this, ensureLaunched("debouncer"), debouncerWorkflow);
494+
}
495+
465496
/**
466497
* Returns the DBOS integration APIs for use by specialized integrations such as AOP aspects and
467498
* event listeners.

transact/src/main/java/dev/dbos/transact/DBOSClient.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -815,6 +815,29 @@ public void sendBulk(@NonNull List<SendMessage> messages, @Nullable SendOptions
815815
return new WorkflowHandleClient<>(workflowId);
816816
}
817817

818+
/**
819+
* Find the workflow ID of the active workflow with the given queue and deduplication ID.
820+
*
821+
* @param queueName name of the queue to search
822+
* @param deduplicationId deduplication ID to look up
823+
* @return the workflow ID, or {@code null} if not found
824+
*/
825+
public @Nullable String findWorkflowIdByDeduplicationId(
826+
@NonNull String queueName, @NonNull String deduplicationId) {
827+
return systemDatabase.findWorkflowIdByDeduplicationId(queueName, deduplicationId);
828+
}
829+
830+
/**
831+
* Create a {@link DebouncerClient} that coalesces repeated calls on the same key into a single
832+
* execution of the named workflow.
833+
*
834+
* @param workflowName name of the workflow function to debounce
835+
* @return a new DebouncerClient bound to this client
836+
*/
837+
public <R> @NonNull DebouncerClient<R> debouncer(@NonNull String workflowName) {
838+
return new DebouncerClient<>(this, workflowName);
839+
}
840+
818841
/**
819842
* Cancel a worflow
820843
*

0 commit comments

Comments
 (0)