Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
ee8a962
Add Debouncer — coalesce rapid workflow calls into one execution
easmith May 19, 2026
421d62d
Fix two replay-safety bugs in Debouncer
easmith May 19, 2026
a5354c0
Fix type coercion for numeric args in DebouncerServiceImpl
easmith May 19, 2026
7302b63
Replace O(N) debouncer scan with O(1) dedup-id point lookup
easmith May 19, 2026
659db1a
Verify and fix debouncer durability for workflow-context call path
easmith May 19, 2026
c74ced0
Align Debouncer with Python semantics
easmith May 19, 2026
2d39757
Fix send accumulation and dead code in dedup path
easmith May 19, 2026
5b8faec
Add debouncer tests; fix priority context propagation for dequeued wo…
easmith May 19, 2026
fcc47f8
Merge branch 'main' into feature/debouncer
devhawk May 20, 2026
e7b8519
Address minor PR review comments for Debouncer
easmith May 21, 2026
46af720
Duration/Instant classes for time related calculations
easmith May 21, 2026
29fc252
Fix remaining minor PR review comments
easmith May 21, 2026
7d0f973
Add DebouncerClient for external callers without a running DBOS instance
easmith May 21, 2026
6968e56
Remove appVersion/priority/deduplicationId from DBOSContext
easmith May 21, 2026
9d649d1
Give Debouncer a direct DBOSExecutor reference
easmith May 21, 2026
4b79d27
spotlessApply
devhawk May 21, 2026
a40faeb
Make runDbosFunctionAsStep public for Debouncer
easmith May 21, 2026
0f360ee
Replace DebouncerServiceImpl with InternalWorkflows
easmith May 21, 2026
827a50f
Rudiments in comments
easmith May 21, 2026
aba8d05
Track internal workflows in a separate registry
easmith May 27, 2026
ea07bb4
Use portable class name for internal debouncer workflow
easmith May 27, 2026
fd142cc
Merge branch 'main' into feature/debouncer
easmith May 27, 2026
ce9f7c5
Merge branch 'main' into feature/debouncer
devhawk May 27, 2026
d77f70f
Fail debounced workflow with ERROR instead of hanging when unregistered
easmith May 28, 2026
40215c3
Use runDbosFunctionAsStep directly in Debouncer instead of manual inW…
easmith May 28, 2026
a25a54a
Rename internal debouncer workflow to debouncerWorkflow, drop redunda…
easmith May 28, 2026
fa57962
Add recovery test: replaying debouncer workflow must not restart user…
easmith May 28, 2026
dee7b87
Test withDeduplicationId forwarding and harden debouncer recovery test
easmith May 28, 2026
f0177d7
spotless
devhawk May 28, 2026
0ba8506
Call executor/registry directly from internal workflows, drop redunda…
easmith May 28, 2026
25d668c
minor change to check executorSupplier.get response
devhawk May 28, 2026
0af1936
Merge branch 'main' into feature/debouncer
devhawk May 28, 2026
ee79c2f
remove unused field
devhawk May 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions transact/src/main/java/dev/dbos/transact/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ public class Constants {

public static final String DBOS_INTERNAL_QUEUE = "_dbos_internal_queue";

public static final String DEBOUNCER_WORKFLOW_NAME = "debouncerWorkflow";
public static final String DEBOUNCER_CLASS_NAME = "DBOS.InternalWorkflows";
public static final String DEBOUNCER_TOPIC = "_dbos_debouncer_topic";
// Event key published by the debouncer-workflow so callers can retrieve the pre-assigned
// user workflow id without relying on Jackson deserialization of workflow inputs.
public static final String DEBOUNCER_CHILD_ID_KEY = "_dbos_debouncer_child_id";

public static final String SYSTEM_JDBC_URL_ENV_VAR = "DBOS_SYSTEM_JDBC_URL";

public static final int DEFAULT_MAX_RECOVERY_ATTEMPTS = 100;
Expand Down
31 changes: 31 additions & 0 deletions transact/src/main/java/dev/dbos/transact/DBOS.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
import dev.dbos.transact.context.DBOSContext;
import dev.dbos.transact.execution.DBOSExecutor;
import dev.dbos.transact.execution.DBOSLifecycleListener;
import dev.dbos.transact.execution.RegisteredWorkflow;
import dev.dbos.transact.execution.ThrowingRunnable;
import dev.dbos.transact.execution.ThrowingSupplier;
import dev.dbos.transact.internal.DBOSIntegration;
import dev.dbos.transact.internal.DBOSInvocationHandler;
import dev.dbos.transact.internal.QueueRegistry;
import dev.dbos.transact.internal.WorkflowRegistry;
import dev.dbos.transact.migrations.MigrationManager;
import dev.dbos.transact.workflow.Debouncer;
import dev.dbos.transact.workflow.ForkOptions;
import dev.dbos.transact.workflow.ListWorkflowsInput;
import dev.dbos.transact.workflow.Queue;
Expand All @@ -28,6 +30,7 @@
import dev.dbos.transact.workflow.WorkflowHandle;
import dev.dbos.transact.workflow.WorkflowSchedule;
import dev.dbos.transact.workflow.WorkflowStatus;
import dev.dbos.transact.workflow.internal.InternalWorkflows;

import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -65,6 +68,7 @@ public class DBOS implements AutoCloseable {
private final DBOSConfig config;
private final AtomicReference<DBOSExecutor> dbosExecutor = new AtomicReference<>();
private final DBOSIntegration integration;
private final RegisteredWorkflow debouncerWorkflow;

private AlertHandler alertHandler;

Expand All @@ -87,6 +91,16 @@ public DBOS(@NonNull DBOSConfig config) {
this.integration =
new DBOSIntegration(
this.config, this.workflowRegistry, dbosExecutor::get, this::registerLifecycleListener);
// Register the built-in debouncer service workflow directly (without a proxy) so callers can
// use Debouncer without having to declare and wire the service themselves.
var internalWorkflows = new InternalWorkflows(this, dbosExecutor::get);
workflowRegistry.registerInternalInstance(internalWorkflows);
this.debouncerWorkflow =
Comment thread
easmith marked this conversation as resolved.
workflowRegistry.registerInternalWorkflow(
Constants.DEBOUNCER_WORKFLOW_NAME,
Constants.DEBOUNCER_CLASS_NAME,
internalWorkflows,
InternalWorkflows.debouncerWorkflowMethod());
}

/**
Expand Down Expand Up @@ -346,6 +360,8 @@ public void launch() {
new HashSet<>(this.lifecycleRegistry),
workflowRegistry.getWorkflowSnapshot(),
workflowRegistry.getInstanceSnapshot(),
workflowRegistry.getInternalWorkflowSnapshot(),
workflowRegistry.getInternalInstanceSnapshot(),
queueRegistry.getSnapshot(),
alertHandler);
}
Expand Down Expand Up @@ -462,6 +478,21 @@ public void sleep(@NonNull Duration duration) {
return startWorkflow(runnable, null);
}

/**
* Build a {@link Debouncer} that consolidates a series of calls on the same key into one
* execution of the targeted workflow using the most recent arguments.
*
* <p>The returned debouncer is immutable; configuration helpers like {@link
* Debouncer#withQueue(String)} and {@link Debouncer#withDebounceTimeout(java.time.Duration)}
* return new instances.
*
* @param <R> the return type of the debounced workflow (used only for type inference)
* @return a fresh debouncer bound to this DBOS instance
*/
public <R> @NonNull Debouncer<R> debouncer() {
Comment thread
easmith marked this conversation as resolved.
return new Debouncer<>(this, ensureLaunched("debouncer"), debouncerWorkflow);
}

/**
* Returns the DBOS integration APIs for use by specialized integrations such as AOP aspects and
* event listeners.
Expand Down
23 changes: 23 additions & 0 deletions transact/src/main/java/dev/dbos/transact/DBOSClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,29 @@ public void sendBulk(@NonNull List<SendMessage> messages, @Nullable SendOptions
return new WorkflowHandleClient<>(workflowId);
}

/**
* Find the workflow ID of the active workflow with the given queue and deduplication ID.
*
* @param queueName name of the queue to search
* @param deduplicationId deduplication ID to look up
* @return the workflow ID, or {@code null} if not found
*/
public @Nullable String findWorkflowIdByDeduplicationId(
@NonNull String queueName, @NonNull String deduplicationId) {
return systemDatabase.findWorkflowIdByDeduplicationId(queueName, deduplicationId);
}

/**
* Create a {@link DebouncerClient} that coalesces repeated calls on the same key into a single
* execution of the named workflow.
*
* @param workflowName name of the workflow function to debounce
* @return a new DebouncerClient bound to this client
*/
public <R> @NonNull DebouncerClient<R> debouncer(@NonNull String workflowName) {
return new DebouncerClient<>(this, workflowName);
}

/**
* Cancel a worflow
*
Expand Down
Loading