diff --git a/transact/src/main/java/dev/dbos/transact/Constants.java b/transact/src/main/java/dev/dbos/transact/Constants.java index f510d2c7..6750f533 100644 --- a/transact/src/main/java/dev/dbos/transact/Constants.java +++ b/transact/src/main/java/dev/dbos/transact/Constants.java @@ -16,6 +16,13 @@ public class Constants { public static final String DBOS_INTERNAL_QUEUE = "_dbos_internal_queue"; + public static final String DEBOUNCER_WORKFLOW_NAME = "debouncerWorkflow"; + public static final String DEBOUNCER_CLASS_NAME = "DBOS.InternalWorkflows"; + public static final String DEBOUNCER_TOPIC = "_dbos_debouncer_topic"; + // Event key published by the debouncer-workflow so callers can retrieve the pre-assigned + // user workflow id without relying on Jackson deserialization of workflow inputs. + public static final String DEBOUNCER_CHILD_ID_KEY = "_dbos_debouncer_child_id"; + public static final String SYSTEM_JDBC_URL_ENV_VAR = "DBOS_SYSTEM_JDBC_URL"; public static final int DEFAULT_MAX_RECOVERY_ATTEMPTS = 100; diff --git a/transact/src/main/java/dev/dbos/transact/DBOS.java b/transact/src/main/java/dev/dbos/transact/DBOS.java index 41d2670c..48438382 100644 --- a/transact/src/main/java/dev/dbos/transact/DBOS.java +++ b/transact/src/main/java/dev/dbos/transact/DBOS.java @@ -4,6 +4,7 @@ import dev.dbos.transact.context.DBOSContext; import dev.dbos.transact.execution.DBOSExecutor; import dev.dbos.transact.execution.DBOSLifecycleListener; +import dev.dbos.transact.execution.RegisteredWorkflow; import dev.dbos.transact.execution.ThrowingRunnable; import dev.dbos.transact.execution.ThrowingSupplier; import dev.dbos.transact.internal.DBOSIntegration; @@ -11,6 +12,7 @@ import dev.dbos.transact.internal.QueueRegistry; import dev.dbos.transact.internal.WorkflowRegistry; import dev.dbos.transact.migrations.MigrationManager; +import dev.dbos.transact.workflow.Debouncer; import dev.dbos.transact.workflow.ForkOptions; import dev.dbos.transact.workflow.ListWorkflowsInput; import dev.dbos.transact.workflow.Queue; @@ -28,6 +30,7 @@ import dev.dbos.transact.workflow.WorkflowHandle; import dev.dbos.transact.workflow.WorkflowSchedule; import dev.dbos.transact.workflow.WorkflowStatus; +import dev.dbos.transact.workflow.internal.InternalWorkflows; import java.io.IOException; import java.io.InputStream; @@ -65,6 +68,7 @@ public class DBOS implements AutoCloseable { private final DBOSConfig config; private final AtomicReference dbosExecutor = new AtomicReference<>(); private final DBOSIntegration integration; + private final RegisteredWorkflow debouncerWorkflow; private AlertHandler alertHandler; @@ -87,6 +91,16 @@ public DBOS(@NonNull DBOSConfig config) { this.integration = new DBOSIntegration( this.config, this.workflowRegistry, dbosExecutor::get, this::registerLifecycleListener); + // Register the built-in debouncer service workflow directly (without a proxy) so callers can + // use Debouncer without having to declare and wire the service themselves. + var internalWorkflows = new InternalWorkflows(this, dbosExecutor::get); + workflowRegistry.registerInternalInstance(internalWorkflows); + this.debouncerWorkflow = + workflowRegistry.registerInternalWorkflow( + Constants.DEBOUNCER_WORKFLOW_NAME, + Constants.DEBOUNCER_CLASS_NAME, + internalWorkflows, + InternalWorkflows.debouncerWorkflowMethod()); } /** @@ -346,6 +360,8 @@ public void launch() { new HashSet<>(this.lifecycleRegistry), workflowRegistry.getWorkflowSnapshot(), workflowRegistry.getInstanceSnapshot(), + workflowRegistry.getInternalWorkflowSnapshot(), + workflowRegistry.getInternalInstanceSnapshot(), queueRegistry.getSnapshot(), alertHandler); } @@ -462,6 +478,21 @@ public void sleep(@NonNull Duration duration) { return startWorkflow(runnable, null); } + /** + * Build a {@link Debouncer} that consolidates a series of calls on the same key into one + * execution of the targeted workflow using the most recent arguments. + * + *

The returned debouncer is immutable; configuration helpers like {@link + * Debouncer#withQueue(String)} and {@link Debouncer#withDebounceTimeout(java.time.Duration)} + * return new instances. + * + * @param the return type of the debounced workflow (used only for type inference) + * @return a fresh debouncer bound to this DBOS instance + */ + public @NonNull Debouncer debouncer() { + return new Debouncer<>(this, ensureLaunched("debouncer"), debouncerWorkflow); + } + /** * Returns the DBOS integration APIs for use by specialized integrations such as AOP aspects and * event listeners. diff --git a/transact/src/main/java/dev/dbos/transact/DBOSClient.java b/transact/src/main/java/dev/dbos/transact/DBOSClient.java index 5bffc3da..14996622 100644 --- a/transact/src/main/java/dev/dbos/transact/DBOSClient.java +++ b/transact/src/main/java/dev/dbos/transact/DBOSClient.java @@ -815,6 +815,29 @@ public void sendBulk(@NonNull List messages, @Nullable SendOptions return new WorkflowHandleClient<>(workflowId); } + /** + * Find the workflow ID of the active workflow with the given queue and deduplication ID. + * + * @param queueName name of the queue to search + * @param deduplicationId deduplication ID to look up + * @return the workflow ID, or {@code null} if not found + */ + public @Nullable String findWorkflowIdByDeduplicationId( + @NonNull String queueName, @NonNull String deduplicationId) { + return systemDatabase.findWorkflowIdByDeduplicationId(queueName, deduplicationId); + } + + /** + * Create a {@link DebouncerClient} that coalesces repeated calls on the same key into a single + * execution of the named workflow. + * + * @param workflowName name of the workflow function to debounce + * @return a new DebouncerClient bound to this client + */ + public @NonNull DebouncerClient debouncer(@NonNull String workflowName) { + return new DebouncerClient<>(this, workflowName); + } + /** * Cancel a worflow * diff --git a/transact/src/main/java/dev/dbos/transact/DebouncerClient.java b/transact/src/main/java/dev/dbos/transact/DebouncerClient.java new file mode 100644 index 00000000..e76d0bfb --- /dev/null +++ b/transact/src/main/java/dev/dbos/transact/DebouncerClient.java @@ -0,0 +1,306 @@ +package dev.dbos.transact; + +import dev.dbos.transact.exceptions.DBOSQueueDuplicatedException; +import dev.dbos.transact.workflow.Queue; +import dev.dbos.transact.workflow.WorkflowHandle; +import dev.dbos.transact.workflow.internal.DebouncerContextOptions; +import dev.dbos.transact.workflow.internal.DebouncerMessage; +import dev.dbos.transact.workflow.internal.DebouncerOptions; + +import java.time.Duration; +import java.util.Objects; +import java.util.UUID; + +import org.jspecify.annotations.NonNull; +import org.jspecify.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Debounces repeated workflow invocations from an external client into a single execution using the + * most recent arguments, without requiring a running {@link DBOS} instance on the caller's side. + * + *

Create instances via {@link DBOSClient#debouncer(String)}. + * + *

Example

+ * + *
{@code
+ * var client = new DBOSClient(url, user, password);
+ *
+ * var debouncer = client.debouncer("process")
+ *     .withClassName(MyServiceImpl.class.getName())
+ *     .withDebounceTimeout(Duration.ofMinutes(5));
+ *
+ * WorkflowHandle handle =
+ *     debouncer.debounce("user-42", Duration.ofSeconds(2), "payload");
+ * String result = handle.getResult();
+ * }
+ * + * @param return type of the debounced workflow + */ +public final class DebouncerClient { + + private static final Logger logger = LoggerFactory.getLogger(DebouncerClient.class); + private static final Duration ACK_TIMEOUT = Duration.ofSeconds(1); + + private final DBOSClient client; + private final String workflowName; + private final @Nullable String className; + private final @Nullable String instanceName; + private final @Nullable String userQueueName; + private final @Nullable Duration debounceTimeout; + // Context options forwarded to the user workflow + private final @Nullable String appVersion; + private final @Nullable Integer priority; + private final @Nullable String userDeduplicationId; + private final @Nullable Duration workflowTimeout; + + DebouncerClient(@NonNull DBOSClient client, @NonNull String workflowName) { + this(client, workflowName, null, null, null, null, null, null, null, null); + } + + private DebouncerClient( + DBOSClient client, + String workflowName, + @Nullable String className, + @Nullable String instanceName, + @Nullable String userQueueName, + @Nullable Duration debounceTimeout, + @Nullable String appVersion, + @Nullable Integer priority, + @Nullable String userDeduplicationId, + @Nullable Duration workflowTimeout) { + this.client = Objects.requireNonNull(client, "client must not be null"); + this.workflowName = Objects.requireNonNull(workflowName, "workflowName must not be null"); + this.className = className; + this.instanceName = instanceName; + this.userQueueName = userQueueName; + this.debounceTimeout = debounceTimeout; + this.appVersion = appVersion; + this.priority = priority; + this.userDeduplicationId = userDeduplicationId; + this.workflowTimeout = workflowTimeout; + } + + /** Specify the Java class name of the target workflow implementation. */ + public @NonNull DebouncerClient withClassName(@Nullable String className) { + return new DebouncerClient<>( + client, + workflowName, + className, + instanceName, + userQueueName, + debounceTimeout, + appVersion, + priority, + userDeduplicationId, + workflowTimeout); + } + + /** Specify the DBOS instance name of the target workflow implementation. */ + public @NonNull DebouncerClient withInstanceName(@Nullable String instanceName) { + return new DebouncerClient<>( + client, + workflowName, + className, + instanceName, + userQueueName, + debounceTimeout, + appVersion, + priority, + userDeduplicationId, + workflowTimeout); + } + + /** + * Set the queue that the user workflow will be enqueued on when the debounce period elapses. + * {@code null} starts the user workflow directly (not enqueued). + */ + public @NonNull DebouncerClient withQueue(@Nullable String queueName) { + if (queueName != null && queueName.isEmpty()) { + throw new IllegalArgumentException("queueName must not be empty"); + } + return new DebouncerClient<>( + client, + workflowName, + className, + instanceName, + queueName, + debounceTimeout, + appVersion, + priority, + userDeduplicationId, + workflowTimeout); + } + + /** See {@link #withQueue(String)}. */ + public @NonNull DebouncerClient withQueue(@NonNull Queue queue) { + return withQueue(queue.name()); + } + + /** + * Set an absolute cap on how long the debouncer may keep absorbing calls for a single key. After + * this duration the user workflow fires even if more calls keep arriving. + */ + public @NonNull DebouncerClient withDebounceTimeout(@Nullable Duration debounceTimeout) { + return new DebouncerClient<>( + client, + workflowName, + className, + instanceName, + userQueueName, + debounceTimeout, + appVersion, + priority, + userDeduplicationId, + workflowTimeout); + } + + /** Target a specific application version for the user workflow. */ + public @NonNull DebouncerClient withAppVersion(@Nullable String appVersion) { + return new DebouncerClient<>( + client, + workflowName, + className, + instanceName, + userQueueName, + debounceTimeout, + appVersion, + priority, + userDeduplicationId, + workflowTimeout); + } + + /** Set the priority for the user workflow (only used when a queue is configured). */ + public @NonNull DebouncerClient withPriority(@Nullable Integer priority) { + return new DebouncerClient<>( + client, + workflowName, + className, + instanceName, + userQueueName, + debounceTimeout, + appVersion, + priority, + userDeduplicationId, + workflowTimeout); + } + + /** Set a deduplication ID to be forwarded to the user workflow. */ + public @NonNull DebouncerClient withDeduplicationId(@Nullable String deduplicationId) { + return new DebouncerClient<>( + client, + workflowName, + className, + instanceName, + userQueueName, + debounceTimeout, + appVersion, + priority, + deduplicationId, + workflowTimeout); + } + + /** Set a timeout for the user workflow. */ + public @NonNull DebouncerClient withTimeout(@Nullable Duration timeout) { + return new DebouncerClient<>( + client, + workflowName, + className, + instanceName, + userQueueName, + debounceTimeout, + appVersion, + priority, + userDeduplicationId, + timeout); + } + + /** + * Debounce a workflow invocation. + * + * @param debounceKey key that groups concurrent calls; calls with the same key are coalesced + * @param debouncePeriod inactivity window before the user workflow runs; each call resets it + * @param args positional arguments to pass to the user workflow + * @return handle pointing to the user workflow that will run with the latest arguments; on the + * deduplication path the handle ID is the child ID published by the running debouncer, not + * the locally generated UUID + */ + public @NonNull WorkflowHandle debounce( + @NonNull String debounceKey, @NonNull Duration debouncePeriod, Object... args) { + + Objects.requireNonNull(debounceKey, "debounceKey must not be null"); + Objects.requireNonNull(debouncePeriod, "debouncePeriod must not be null"); + if (debouncePeriod.isNegative() || debouncePeriod.isZero()) { + throw new IllegalArgumentException("debouncePeriod must be a positive non-zero duration"); + } + // className is required: the debouncer workflow uses it to look up the registered workflow. + if (className == null) { + throw new IllegalStateException( + "className is required; call withClassName(MyServiceImpl.class.getName()) before debounce()"); + } + + // Not inside a workflow, so UUIDs can be generated directly (no step wrapping needed). + String userWorkflowId = UUID.randomUUID().toString(); + String messageId = UUID.randomUUID().toString(); + String deduplicationId = workflowName + "-" + debounceKey; + + DebouncerOptions debouncerOpts = + new DebouncerOptions( + workflowName, + className, + instanceName, + userQueueName, + debounceTimeout, + appVersion, + priority, + userDeduplicationId); + DebouncerContextOptions ctx = new DebouncerContextOptions(userWorkflowId, workflowTimeout); + DebouncerMessage initial = new DebouncerMessage(messageId, args, debouncePeriod); + + var enqueueOpts = + new DBOSClient.EnqueueOptions( + Constants.DEBOUNCER_WORKFLOW_NAME, + Constants.DEBOUNCER_CLASS_NAME, + Constants.DBOS_INTERNAL_QUEUE) + .withDeduplicationId(deduplicationId); + + while (true) { + try { + client.enqueueWorkflow(enqueueOpts, new Object[] {debouncerOpts, ctx, initial}); + return client.retrieveWorkflow(userWorkflowId); + } catch (DBOSQueueDuplicatedException dup) { + // A debouncer for this key is already running — forward the latest args to it. + String existingDebouncerId = + client.findWorkflowIdByDeduplicationId(Constants.DBOS_INTERNAL_QUEUE, deduplicationId); + if (existingDebouncerId == null) { + logger.debug( + "Debouncer for dedupId {} not found after conflict; retrying", deduplicationId); + continue; + } + + DebouncerMessage msg = new DebouncerMessage(messageId, args, debouncePeriod); + client.send(existingDebouncerId, msg, Constants.DEBOUNCER_TOPIC, messageId); + + var ack = client.getEvent(existingDebouncerId, messageId, ACK_TIMEOUT); + if (ack.isEmpty()) { + logger.debug( + "Debouncer {} did not ack message {}; retrying", existingDebouncerId, messageId); + continue; + } + + // DEBOUNCER_CHILD_ID_KEY is published as the debouncer's first action, before the + // recv-loop. If the ack arrived the event should be available; retry if not to guard + // against transient delays. + var childIdOpt = + client.getEvent(existingDebouncerId, Constants.DEBOUNCER_CHILD_ID_KEY, ACK_TIMEOUT); + if (childIdOpt.isEmpty()) { + logger.debug( + "DEBOUNCER_CHILD_ID_KEY not yet available from {}; retrying", existingDebouncerId); + continue; + } + return client.retrieveWorkflow((String) childIdOpt.get()); + } + } + } +} diff --git a/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java b/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java index 54280eed..5a85fd0a 100644 --- a/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java +++ b/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java @@ -44,6 +44,7 @@ import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; +import org.jspecify.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -359,6 +360,14 @@ public void recordWorkflowError(String workflowId, String error) { dbRetry(() -> WorkflowDAO.recordWorkflowError(ctx, workflowId, error)); } + /** + * Insert a workflow_status row already in the ERROR state, for a workflow that was never started. + * See {@link WorkflowDAO#recordErrorForUnstartedWorkflow}. + */ + public void recordErrorForUnstartedWorkflow(WorkflowStatusInternal initStatus, String error) { + dbRetry(() -> WorkflowDAO.recordErrorForUnstartedWorkflow(ctx, initStatus, error)); + } + public WorkflowStatus getWorkflowStatus(String workflowId) { return dbRetry(() -> WorkflowDAO.getWorkflowStatus(ctx, workflowId)); } @@ -371,6 +380,12 @@ public List listWorkflows(ListWorkflowsInput input) { return dbRetry(() -> WorkflowDAO.listWorkflows(ctx, input)); } + public @Nullable String findWorkflowIdByDeduplicationId( + String queueName, String deduplicationId) { + return dbRetry( + () -> WorkflowDAO.findWorkflowIdByDeduplicationId(ctx, queueName, deduplicationId)); + } + public List getWorkflowAggregates(GetWorkflowAggregatesInput input) { return dbRetry(() -> WorkflowDAO.getWorkflowAggregates(ctx, input)); } diff --git a/transact/src/main/java/dev/dbos/transact/database/dao/WorkflowDAO.java b/transact/src/main/java/dev/dbos/transact/database/dao/WorkflowDAO.java index 1e419fc2..fba3ff61 100644 --- a/transact/src/main/java/dev/dbos/transact/database/dao/WorkflowDAO.java +++ b/transact/src/main/java/dev/dbos/transact/database/dao/WorkflowDAO.java @@ -51,6 +51,7 @@ import java.util.UUID; import java.util.stream.Stream; +import org.jspecify.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -373,6 +374,28 @@ public static void recordWorkflowError(DbContext ctx, String workflowId, String } } + /** + * Insert a workflow_status row and immediately mark it ERROR, for a workflow that was never + * actually started. Used when an internal workflow that is responsible for starting a user + * workflow fails before it can do so: without a status row, any handle awaiting the user workflow + * would poll {@link #awaitWorkflowResult} forever. + * + * @param initStatus metadata for the workflow that will be recorded as failed + * @param error the error serialized as json + */ + public static void recordErrorForUnstartedWorkflow( + DbContext ctx, WorkflowStatusInternal initStatus, String error) throws SQLException { + + // No explicit transaction: the calling debouncer workflow is itself durable, so a crash + // between these two statements is replayed and retried. ON CONFLICT makes the insert + // idempotent and the outcome update is safe to repeat. + try (var conn = ctx.getConnection()) { + insertWorkflowStatus(conn, ctx.schema(), initStatus, UUID.randomUUID().toString(), false); + updateWorkflowOutcome( + conn, ctx.schema(), initStatus.workflowId(), WorkflowState.ERROR, null, error); + } + } + public static String getWorkflowSerialization(DbContext ctx, String workflowId) throws SQLException { var sql = @@ -421,6 +444,32 @@ public static WorkflowStatus getWorkflowStatus( return null; } + /** + * Look up the workflow_uuid of the currently-enqueued or running workflow with a given + * (queue_name, deduplication_id) pair. Uses the UNIQUE index on that pair for O(1) lookup. + * Returns {@code null} if no active workflow with that deduplication id exists. + */ + public static @Nullable String findWorkflowIdByDeduplicationId( + DbContext ctx, String queueName, String deduplicationId) throws SQLException { + var sql = + """ + SELECT workflow_uuid + FROM "%s".workflow_status + WHERE queue_name = ? + AND deduplication_id = ? + LIMIT 1 + """ + .formatted(ctx.schema()); + try (var conn = ctx.getConnection(); + var stmt = conn.prepareStatement(sql)) { + stmt.setString(1, queueName); + stmt.setString(2, deduplicationId); + try (var rs = stmt.executeQuery()) { + return rs.next() ? rs.getString("workflow_uuid") : null; + } + } + } + public static void setWorkflowDelay(DbContext ctx, String workflowId, WorkflowDelay delay) throws SQLException { Objects.requireNonNull(workflowId, "workflowId must not be null"); diff --git a/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java b/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java index 5c73c4da..d5161ff8 100644 --- a/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java +++ b/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java @@ -78,7 +78,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; -import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -95,7 +94,7 @@ public class DBOSExecutor implements AutoCloseable { // Invocation and hookHolder are used by startWorkflow to capture // workflow information w/o executing workflow function - record Invocation( + public record Invocation( DBOSExecutor executor, String workflowName, String className, @@ -121,6 +120,7 @@ public String fqName() { private Set listeners; private Map workflowMap; private Map instanceMap; + private Map internalWorkflowMap; private Map queueMap; private ConcurrentHashMap workflowsInProgress = new ConcurrentHashMap<>(); @@ -166,6 +166,8 @@ public void start( Set listenerSet, Map workflowMap, Map instanceMap, + Map internalWorkflowMap, + Map internalInstanceMap, List queues, AlertHandler alertHandler) { @@ -174,6 +176,7 @@ public void start( this.workflowMap = workflowMap; this.instanceMap = instanceMap; + this.internalWorkflowMap = internalWorkflowMap; this.queueMap = queues.stream().collect(Collectors.toUnmodifiableMap(Queue::name, queue -> queue)); this.listeners = listenerSet; @@ -321,6 +324,7 @@ public void close() { this.workflowMap = null; this.instanceMap = null; + this.internalWorkflowMap = null; logger.debug("DBOS Executor stopped"); } @@ -351,6 +355,11 @@ SystemDatabase getSystemDatabase() { return systemDatabase; } + public @Nullable String findWorkflowIdByDeduplicationId( + String queueName, String deduplicationId) { + return systemDatabase.findWorkflowIdByDeduplicationId(queueName, deduplicationId); + } + QueueService getQueueService() { return queueService; } @@ -390,9 +399,13 @@ public Collection getRegisteredWorkflowInstances() { } public Optional getRegisteredWorkflow( - String workflowName, String className, String instanceName) { + String workflowName, String className, @Nullable String instanceName) { var fqName = RegisteredWorkflow.fullyQualifiedName(workflowName, className, instanceName); - return Optional.ofNullable(this.workflowMap.get(fqName)); + var wf = this.workflowMap.get(fqName); + if (wf == null) { + wf = this.internalWorkflowMap.get(fqName); + } + return Optional.ofNullable(wf); } public Optional findQueue(String queueName) { @@ -1083,7 +1096,7 @@ public T runStep( return runStepInternal(step, options, childWorkflowId); } - private T runDbosFunctionAsStep( + public T runDbosFunctionAsStep( @NonNull ThrowingSupplier step, @NonNull String stepName, @Nullable String childWorkflowId) @@ -1291,6 +1304,36 @@ public Object runWorkflow( } } + /** + * Capture the workflow invocation triggered by the supplied lambda without executing the + * workflow. The lambda must call exactly one @Workflow method on a registered proxy on this + * executor. + */ + public Invocation captureInvocation(ThrowingSupplier wfLambda) { + AtomicReference capturedInvocation = new AtomicReference<>(); + DBOSExecutor.hookHolder.set( + (invocation) -> { + if (!capturedInvocation.compareAndSet(null, invocation)) { + throw new RuntimeException("Only one @Workflow can be called in the captured lambda"); + } + }); + try { + wfLambda.execute(); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + DBOSExecutor.hookHolder.remove(); + } + var invocation = + Objects.requireNonNull( + capturedInvocation.get(), "The lambda must call exactly one @Workflow"); + if (invocation.executor() != this) { + throw new IllegalStateException( + "The @Workflow method must be called on the DBOS instance passed to the lambda"); + } + return invocation; + } + // execute a workflow via startWorkflow public WorkflowHandle startWorkflow( ThrowingSupplier wfLambda, StartWorkflowOptions options) { @@ -1303,35 +1346,7 @@ public WorkflowHandle startWorkflow( throw new IllegalArgumentException("explicit timeout and deadline cannot both be set"); } - // set the invocation hook holder and invoke the lambda to retrieve the invocation information - Function, Invocation> invocationSupplier = - (lambda) -> { - AtomicReference capturedInvocation = new AtomicReference<>(); - DBOSExecutor.hookHolder.set( - (invocation) -> { - if (!capturedInvocation.compareAndSet(null, invocation)) { - throw new RuntimeException( - "Only one @Workflow can be called in the startWorkflow lambda"); - } - }); - - try { - lambda.execute(); - } catch (Exception e) { - throw new RuntimeException(e); - } finally { - DBOSExecutor.hookHolder.remove(); - } - - return Objects.requireNonNull( - capturedInvocation.get(), "The startWorkflow lambda must call exactly one @Workflow"); - }; - - var invocation = invocationSupplier.apply(wfLambda); - if (invocation.executor() != this) { - throw new IllegalStateException( - "The @Workflow method must be called on the DBOS instance passed to the startWorkflow lambda"); - } + var invocation = captureInvocation(wfLambda); var workflow = getRegisteredWorkflow( invocation.workflowName(), invocation.className(), invocation.instanceName()) @@ -1415,7 +1430,9 @@ public WorkflowHandle executeWorkflowById( var wfName = RegisteredWorkflow.fullyQualifiedName( status.workflowName(), status.className(), status.instanceName()); - RegisteredWorkflow workflow = workflowMap.get(wfName); + RegisteredWorkflow workflow = + getRegisteredWorkflow(status.workflowName(), status.className(), status.instanceName()) + .orElse(null); if (workflow == null) { throw new DBOSWorkflowFunctionNotFoundException(workflowId, wfName); @@ -1480,7 +1497,7 @@ private void validateWorkflow(String workflowName, String className) { private void validateWorkflow(String workflowName, String className, String instanceName) { var fqName = RegisteredWorkflow.fullyQualifiedName(workflowName, className, instanceName); - if (!workflowMap.containsKey(fqName)) { + if (!workflowMap.containsKey(fqName) && !internalWorkflowMap.containsKey(fqName)) { throw new IllegalStateException("Workflow function %s is not registered".formatted(fqName)); } } @@ -1569,15 +1586,12 @@ private WorkflowHandle executeWorkflow( if (options.deduplicationId() != null) { badOptionList.add("deduplicationId"); } - if (options.priority() != null) { badOptionList.add("priority"); } - if (options.queuePartitionKey() != null) { badOptionList.add("queuePartitionKey"); } - if (options.delay() != null) { badOptionList.add("delay"); } @@ -1892,4 +1906,50 @@ private void persistWorkflowError(String workflowId, Throwable error, String ser var serialized = SerializationUtil.serializeError(error, serialization, this.serializer); systemDatabase.recordWorkflowError(workflowId, serialized.serializedValue()); } + + /** + * Record an ERROR result for a workflow that was never started, so that handles awaiting it fail + * fast instead of polling forever. Used by internal workflows (e.g. the debouncer) that take + * responsibility for starting a user workflow and must surface their own failures to the caller's + * handle when they cannot. + */ + public void recordErrorForUnstartedWorkflow( + String workflowId, + String workflowName, + String className, + @Nullable String instanceName, + @Nullable Object[] args, + Throwable error) { + String serialization = this.serializer.name(); + var serializedArgs = + SerializationUtil.serializeArgs( + Objects.requireNonNullElseGet(args, () -> new Object[0]), + null, + serialization, + this.serializer); + var serializedError = SerializationUtil.serializeError(error, serialization, this.serializer); + var initStatus = + new WorkflowStatusInternal( + workflowId, + workflowName, + className, + instanceName, + null, + null, + null, + null, + null, + null, + null, + null, + serializedArgs.serializedValue(), + executorId(), + appVersion(), + appId(), + null, + null, + null, + serializedArgs.serialization()); + systemDatabase.recordErrorForUnstartedWorkflow(initStatus, serializedError.serializedValue()); + } } diff --git a/transact/src/main/java/dev/dbos/transact/execution/ExecutionOptions.java b/transact/src/main/java/dev/dbos/transact/execution/ExecutionOptions.java index c4e953bb..0340c96a 100644 --- a/transact/src/main/java/dev/dbos/transact/execution/ExecutionOptions.java +++ b/transact/src/main/java/dev/dbos/transact/execution/ExecutionOptions.java @@ -126,6 +126,38 @@ public ExecutionOptions withSerialization(String serialization) { serialization); } + public ExecutionOptions withPriority(Integer priority) { + return new ExecutionOptions( + this.workflowId, + this.timeout, + this.deadline, + this.queueName, + this.deduplicationId, + priority, + this.queuePartitionKey, + this.delay, + this.appVersion, + this.isRecoveryRequest, + this.isDequeuedRequest, + this.serialization); + } + + public ExecutionOptions withAppVersion(String appVersion) { + return new ExecutionOptions( + this.workflowId, + this.timeout, + this.deadline, + this.queueName, + this.deduplicationId, + this.priority, + this.queuePartitionKey, + this.delay, + appVersion, + this.isRecoveryRequest, + this.isDequeuedRequest, + this.serialization); + } + public Duration timeoutDuration() { if (timeout instanceof Timeout.Explicit e) { return e.value(); diff --git a/transact/src/main/java/dev/dbos/transact/internal/DBOSIntegration.java b/transact/src/main/java/dev/dbos/transact/internal/DBOSIntegration.java index 4abb6b1f..7b7e9c76 100644 --- a/transact/src/main/java/dev/dbos/transact/internal/DBOSIntegration.java +++ b/transact/src/main/java/dev/dbos/transact/internal/DBOSIntegration.java @@ -13,7 +13,6 @@ import java.lang.reflect.Method; import java.util.Collection; -import java.util.Collections; import java.util.Objects; import java.util.Optional; import java.util.function.Consumer; @@ -193,29 +192,29 @@ public Object runWorkflow( } /** - * Get all workflows registered with DBOS. + * Get all user-registered workflows. Internal/system workflows registered by DBOS itself (for + * example, the debouncer service workflow) are excluded. * - * @return list of all registered workflow methods + * @return list of all user-registered workflow methods */ public @NonNull Collection getRegisteredWorkflows() { var executor = executorSupplier.get(); - if (executor != null) { - return executor.getRegisteredWorkflows(); - } - return Collections.unmodifiableCollection(workflowRegistry.getWorkflowSnapshot().values()); + return executor != null + ? executor.getRegisteredWorkflows() + : workflowRegistry.getWorkflowSnapshot().values(); } /** - * Get all workflow instances registered with DBOS. + * Get all user-registered workflow instances. Internal/system instances registered by DBOS itself + * (for example, the debouncer service) are excluded. * - * @return list of all class instances containing registered workflow methods + * @return list of all user-registered class instances containing workflow methods */ public @NonNull Collection getRegisteredWorkflowInstances() { var executor = executorSupplier.get(); - if (executor != null) { - return executor.getRegisteredWorkflowInstances(); - } - return Collections.unmodifiableCollection(workflowRegistry.getInstanceSnapshot().values()); + return executor != null + ? executor.getRegisteredWorkflowInstances() + : workflowRegistry.getInstanceSnapshot().values(); } /** @@ -241,7 +240,7 @@ public Optional getRegisteredWorkflow( * @return an Optional containing the RegisteredWorkflow if found, otherwise empty */ public Optional getRegisteredWorkflow( - @NonNull String workflowName, @NonNull String className, @NonNull String instanceName) { + @NonNull String workflowName, @NonNull String className, @Nullable String instanceName) { var executor = executorSupplier.get(); if (executor != null) { return executor.getRegisteredWorkflow(workflowName, className, instanceName); diff --git a/transact/src/main/java/dev/dbos/transact/internal/WorkflowRegistry.java b/transact/src/main/java/dev/dbos/transact/internal/WorkflowRegistry.java index c9639256..4abeac76 100644 --- a/transact/src/main/java/dev/dbos/transact/internal/WorkflowRegistry.java +++ b/transact/src/main/java/dev/dbos/transact/internal/WorkflowRegistry.java @@ -34,12 +34,27 @@ public class WorkflowRegistry { new ConcurrentHashMap<>(); private final ConcurrentHashMap wfRegistry = new ConcurrentHashMap<>(); + private final ConcurrentHashMap internalWfInstRegistry = + new ConcurrentHashMap<>(); + private final ConcurrentHashMap internalWfRegistry = + new ConcurrentHashMap<>(); public void registerInstance(@Nullable String instanceName, @NonNull Object target) { + registerInstance(instanceName, target, wfInstRegistry); + } + + public void registerInternalInstance(@NonNull Object target) { + registerInstance(null, target, internalWfInstRegistry); + } + + private static void registerInstance( + @Nullable String instanceName, + @NonNull Object target, + ConcurrentHashMap registry) { var className = getWorkflowClassName(target); var fqName = RegisteredWorkflowInstance.fullyQualifiedInstName(className, instanceName); var regClass = new RegisteredWorkflowInstance(className, instanceName, target); - var previous = wfInstRegistry.putIfAbsent(fqName, regClass); + var previous = registry.putIfAbsent(fqName, regClass); if (previous != null) { throw new IllegalStateException("Workflow class already registered with name: " + fqName); } @@ -53,6 +68,35 @@ public RegisteredWorkflow registerWorkflow( @NonNull Method method, @Nullable Integer maxRecoveryAttempts, @Nullable SerializationStrategy serializationStrategy) { + return registerWorkflow( + workflowName, + className, + instanceName, + target, + method, + maxRecoveryAttempts, + serializationStrategy, + wfRegistry); + } + + public RegisteredWorkflow registerInternalWorkflow( + @NonNull String workflowName, + @NonNull String className, + @NonNull Object target, + @NonNull Method method) { + return registerWorkflow( + workflowName, className, null, target, method, null, null, internalWfRegistry); + } + + private static RegisteredWorkflow registerWorkflow( + @NonNull String workflowName, + @NonNull String className, + @Nullable String instanceName, + @NonNull Object target, + @NonNull Method method, + @Nullable Integer maxRecoveryAttempts, + @Nullable SerializationStrategy serializationStrategy, + ConcurrentHashMap registry) { var fqName = RegisteredWorkflow.fullyQualifiedName(workflowName, className, instanceName); var regWorkflow = @@ -65,7 +109,7 @@ public RegisteredWorkflow registerWorkflow( Objects.requireNonNullElse(maxRecoveryAttempts, -1), Objects.requireNonNullElse(serializationStrategy, SerializationStrategy.DEFAULT)); - var previous = wfRegistry.putIfAbsent(fqName, regWorkflow); + var previous = registry.putIfAbsent(fqName, regWorkflow); if (previous != null) { throw new IllegalStateException("Workflow already registered with name: " + fqName); } @@ -79,4 +123,12 @@ public Map getWorkflowSnapshot() { public Map getInstanceSnapshot() { return Map.copyOf(wfInstRegistry); } + + public Map getInternalWorkflowSnapshot() { + return Map.copyOf(internalWfRegistry); + } + + public Map getInternalInstanceSnapshot() { + return Map.copyOf(internalWfInstRegistry); + } } diff --git a/transact/src/main/java/dev/dbos/transact/workflow/Debouncer.java b/transact/src/main/java/dev/dbos/transact/workflow/Debouncer.java new file mode 100644 index 00000000..c568f805 --- /dev/null +++ b/transact/src/main/java/dev/dbos/transact/workflow/Debouncer.java @@ -0,0 +1,323 @@ +package dev.dbos.transact.workflow; + +import dev.dbos.transact.Constants; +import dev.dbos.transact.DBOS; +import dev.dbos.transact.StartWorkflowOptions; +import dev.dbos.transact.context.DBOSContextHolder; +import dev.dbos.transact.exceptions.DBOSQueueDuplicatedException; +import dev.dbos.transact.execution.DBOSExecutor; +import dev.dbos.transact.execution.RegisteredWorkflow; +import dev.dbos.transact.execution.ThrowingRunnable; +import dev.dbos.transact.execution.ThrowingSupplier; +import dev.dbos.transact.workflow.internal.DebouncerContextOptions; +import dev.dbos.transact.workflow.internal.DebouncerMessage; +import dev.dbos.transact.workflow.internal.DebouncerOptions; + +import java.time.Duration; +import java.util.Objects; +import java.util.UUID; + +import org.jspecify.annotations.NonNull; +import org.jspecify.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Debounces a series of workflow invocations on the same key into a single execution that uses the + * most recently supplied arguments. + * + *

Each unique {@code debounceKey} maintains its own debouncer service workflow that absorbs + * incoming calls. The service workflow starts the actual user workflow after either {@code + * debouncePeriod} has elapsed since the last incoming call or the absolute {@code debounceTimeout} + * has expired. + * + *

The returned {@link WorkflowHandle} points to the user workflow that will eventually run with + * the latest arguments; polling it for {@code getResult()} waits for that workflow's outcome. + * + *

Example

+ * + *
{@code
+ * var dbos = new DBOS(config);
+ * var svc = dbos.registerProxy(MyService.class, new MyServiceImpl());
+ * dbos.launch();
+ *
+ * var debouncer = dbos.debouncer()
+ *     .withDebounceTimeout(Duration.ofMinutes(5));
+ *
+ * WorkflowHandle handle = debouncer.debounce(
+ *     "user-42",
+ *     Duration.ofSeconds(2),
+ *     () -> svc.process("payload"));
+ * String result = handle.getResult();
+ * }
+ * + * @param return type of the debounced workflow + */ +public final class Debouncer { + + private static final Logger logger = LoggerFactory.getLogger(Debouncer.class); + + /** + * How long to wait for the debouncer service workflow to acknowledge a forwarded message before + * retrying. + */ + private static final Duration ACK_TIMEOUT = Duration.ofSeconds(1); + + private record DebounceIds(String userWorkflowId, String messageId) {} + + private final DBOS dbos; + private final DBOSExecutor executor; + private final RegisteredWorkflow debouncerWorkflow; + private final @Nullable String queueName; + private final @Nullable Duration debounceTimeout; + private final @Nullable String appVersion; + private final @Nullable Integer priority; + private final @Nullable String deduplicationId; + + public Debouncer( + @NonNull DBOS dbos, + @NonNull DBOSExecutor executor, + @NonNull RegisteredWorkflow debouncerWorkflow) { + this(dbos, executor, debouncerWorkflow, null, null, null, null, null); + } + + private Debouncer( + DBOS dbos, + DBOSExecutor executor, + RegisteredWorkflow debouncerWorkflow, + @Nullable String queueName, + @Nullable Duration debounceTimeout, + @Nullable String appVersion, + @Nullable Integer priority, + @Nullable String deduplicationId) { + this.dbos = Objects.requireNonNull(dbos, "dbos must not be null"); + this.executor = Objects.requireNonNull(executor, "executor must not be null"); + this.debouncerWorkflow = + Objects.requireNonNull(debouncerWorkflow, "debouncerWorkflow must not be null"); + this.queueName = queueName; + this.debounceTimeout = debounceTimeout; + this.appVersion = appVersion; + this.priority = priority; + this.deduplicationId = deduplicationId; + } + + /** + * Set the queue that the user workflow will be enqueued on when the debounce period elapses. + * {@code null} starts the user workflow directly (not enqueued). + */ + public @NonNull Debouncer withQueue(@Nullable String queueName) { + if (queueName != null && queueName.isEmpty()) { + throw new IllegalArgumentException("queueName must not be empty"); + } + return new Debouncer<>( + dbos, + executor, + debouncerWorkflow, + queueName, + debounceTimeout, + appVersion, + priority, + deduplicationId); + } + + /** See {@link #withQueue(String)}. */ + public @NonNull Debouncer withQueue(@NonNull Queue queue) { + return withQueue(queue.name()); + } + + /** + * Set an absolute cap on how long a debouncer for a single key may keep absorbing calls. After + * this duration elapses from the first call, the user workflow is started even if more calls keep + * arriving. + */ + public @NonNull Debouncer withDebounceTimeout(@Nullable Duration debounceTimeout) { + return new Debouncer<>( + dbos, + executor, + debouncerWorkflow, + queueName, + debounceTimeout, + appVersion, + priority, + deduplicationId); + } + + /** Target a specific application version for the user workflow. */ + public @NonNull Debouncer withAppVersion(@Nullable String appVersion) { + return new Debouncer<>( + dbos, + executor, + debouncerWorkflow, + queueName, + debounceTimeout, + appVersion, + priority, + deduplicationId); + } + + /** Set the priority for the user workflow (only applies when a queue is configured). */ + public @NonNull Debouncer withPriority(@Nullable Integer priority) { + return new Debouncer<>( + dbos, + executor, + debouncerWorkflow, + queueName, + debounceTimeout, + appVersion, + priority, + deduplicationId); + } + + /** Set a deduplication ID to be forwarded to the user workflow. */ + public @NonNull Debouncer withDeduplicationId(@Nullable String deduplicationId) { + return new Debouncer<>( + dbos, + executor, + debouncerWorkflow, + queueName, + debounceTimeout, + appVersion, + priority, + deduplicationId); + } + + /** + * Debounce a workflow with no return value. + * + * @param debounceKey key that groups concurrent calls; calls with the same key are coalesced + * @param debouncePeriod inactivity window before the user workflow runs; each call resets it + * @param wfLambda lambda calling exactly one {@code @Workflow} method + * @return handle to the future user workflow + */ + public @NonNull WorkflowHandle debounce( + @NonNull String debounceKey, + @NonNull Duration debouncePeriod, + @NonNull ThrowingRunnable wfLambda) { + return debounceInternal( + debounceKey, + debouncePeriod, + () -> { + wfLambda.execute(); + return null; + }); + } + + /** + * Debounce a workflow with a return value. + * + * @param debounceKey key that groups concurrent calls; calls with the same key are coalesced + * @param debouncePeriod inactivity window before the user workflow runs; each call resets it + * @param wfLambda lambda calling exactly one {@code @Workflow} method + * @return handle to the future user workflow + */ + public @NonNull WorkflowHandle debounce( + @NonNull String debounceKey, + @NonNull Duration debouncePeriod, + @NonNull ThrowingSupplier wfLambda) { + return debounceInternal(debounceKey, debouncePeriod, wfLambda); + } + + private WorkflowHandle debounceInternal( + @NonNull String debounceKey, + @NonNull Duration debouncePeriod, + @NonNull ThrowingSupplier wfLambda) { + + Objects.requireNonNull(debounceKey, "debounceKey must not be null"); + Objects.requireNonNull(debouncePeriod, "debouncePeriod must not be null"); + Objects.requireNonNull(wfLambda, "wfLambda must not be null"); + if (debouncePeriod.isNegative() || debouncePeriod.isZero()) { + throw new IllegalArgumentException("debouncePeriod must be a positive non-zero duration"); + } + + DBOSExecutor.Invocation invocation = executor.captureInvocation(wfLambda); + + // Inside a workflow, ID generation is wrapped in a step so replay is deterministic; + // runDbosFunctionAsStep runs the lambda directly when not in a workflow. + DebounceIds ids = + executor.runDbosFunctionAsStep( + () -> + new DebounceIds( + DBOSContextHolder.get().getNextWorkflowId(UUID.randomUUID().toString()), + UUID.randomUUID().toString()), + "DBOS.assignDebounceIds", + null); + String userWorkflowId = ids.userWorkflowId(); + String messageId = ids.messageId(); + + String debouncerDeduplicationId = invocation.workflowName() + "-" + debounceKey; + + DebouncerOptions options = + new DebouncerOptions( + invocation.workflowName(), + invocation.className(), + invocation.instanceName(), + queueName, + debounceTimeout, + appVersion, + priority, + deduplicationId); + Duration workflowTimeout = DBOS.inWorkflow() ? DBOSContextHolder.get().getTimeout() : null; + DebouncerContextOptions ctx = new DebouncerContextOptions(userWorkflowId, workflowTimeout); + DebouncerMessage initial = new DebouncerMessage(messageId, invocation.args(), debouncePeriod); + + while (true) { + try { + var startOpts = + new StartWorkflowOptions() + .withQueue(Constants.DBOS_INTERNAL_QUEUE) + .withDeduplicationId(debouncerDeduplicationId); + executor.startRegisteredWorkflow( + debouncerWorkflow, new Object[] {options, ctx, initial}, startOpts); + // Successfully enqueued a fresh debouncer for this key. + return dbos.retrieveWorkflow(userWorkflowId); + } catch (DBOSQueueDuplicatedException dup) { + // A debouncer for this key is already running. Forward the latest args to it. + // When called from inside a workflow, record the result as a durable step so that + // replay returns the same debouncer id and the subsequent send/getEvent steps stay + // deterministic. Mirrors Python's call_function_as_step("DBOS.get_deduplicated_workflow"). + String existingDebouncerId = + executor.runDbosFunctionAsStep( + () -> lookupExistingDebouncerId(debouncerDeduplicationId), + "DBOS.lookupDebouncer", + null); + if (existingDebouncerId == null) { + // The existing debouncer finished between the enqueue attempt and now. Retry from + // scratch — the next enqueue should succeed. + logger.debug( + "Debouncer for dedupId {} not found after conflict; retrying", + debouncerDeduplicationId); + continue; + } + DebouncerMessage msg = new DebouncerMessage(messageId, invocation.args(), debouncePeriod); + // messageId is the idempotency key — exactly-once delivery. + dbos.send(existingDebouncerId, msg, Constants.DEBOUNCER_TOPIC, messageId); + + // Wait for the debouncer to acknowledge receipt. If the debouncer exited before + // processing this message, no ack arrives — start over. + var ack = dbos.getEvent(existingDebouncerId, messageId, ACK_TIMEOUT); + if (ack.isEmpty()) { + logger.debug( + "Debouncer {} did not ack message {}; retrying", existingDebouncerId, messageId); + continue; + } + // CHILD_ID_KEY is set as the debouncer workflow's first action, before the recv-loop. + // If the ack arrived, the debouncer has already published this event — it cannot be empty. + var childId = + dbos.getEvent( + existingDebouncerId, Constants.DEBOUNCER_CHILD_ID_KEY, ACK_TIMEOUT) + .orElseThrow( + () -> + new IllegalStateException( + "Debouncer " + + existingDebouncerId + + " acked but did not publish " + + Constants.DEBOUNCER_CHILD_ID_KEY)); + return dbos.retrieveWorkflow(childId); + } + } + } + + private @Nullable String lookupExistingDebouncerId(String deduplicationId) { + return executor.findWorkflowIdByDeduplicationId(Constants.DBOS_INTERNAL_QUEUE, deduplicationId); + } +} diff --git a/transact/src/main/java/dev/dbos/transact/workflow/internal/DebouncerContextOptions.java b/transact/src/main/java/dev/dbos/transact/workflow/internal/DebouncerContextOptions.java new file mode 100644 index 00000000..f0fc3339 --- /dev/null +++ b/transact/src/main/java/dev/dbos/transact/workflow/internal/DebouncerContextOptions.java @@ -0,0 +1,19 @@ +package dev.dbos.transact.workflow.internal; + +import java.time.Duration; + +import org.jspecify.annotations.NonNull; +import org.jspecify.annotations.Nullable; + +/** + * Per-call context forwarded from the caller of {@code Debouncer.debounce()} to the debouncer + * service workflow. Contains only values that are inherently call-specific: the pre-assigned user + * workflow ID and the caller's active timeout (if any). + * + *

Enqueue-time options (appVersion, priority, deduplicationId) are carried in {@link + * DebouncerOptions} instead, where they are set explicitly via the Debouncer builder. + * + *

Not part of the public API. + */ +public record DebouncerContextOptions( + @NonNull String userWorkflowId, @Nullable Duration workflowTimeout) {} diff --git a/transact/src/main/java/dev/dbos/transact/workflow/internal/DebouncerMessage.java b/transact/src/main/java/dev/dbos/transact/workflow/internal/DebouncerMessage.java new file mode 100644 index 00000000..5c8f836e --- /dev/null +++ b/transact/src/main/java/dev/dbos/transact/workflow/internal/DebouncerMessage.java @@ -0,0 +1,15 @@ +package dev.dbos.transact.workflow.internal; + +import java.time.Duration; + +import org.jspecify.annotations.NonNull; + +/** + * Message sent from a {@code Debouncer} caller to the debouncer service workflow each time the + * debounce key fires. The debouncer service workflow uses the most recently received message's args + * when it eventually starts the user workflow. + * + *

Not part of the public API — the debouncer infrastructure consumes this directly. + */ +public record DebouncerMessage( + @NonNull String messageId, @NonNull Object[] args, @NonNull Duration debouncePeriod) {} diff --git a/transact/src/main/java/dev/dbos/transact/workflow/internal/DebouncerOptions.java b/transact/src/main/java/dev/dbos/transact/workflow/internal/DebouncerOptions.java new file mode 100644 index 00000000..e710e248 --- /dev/null +++ b/transact/src/main/java/dev/dbos/transact/workflow/internal/DebouncerOptions.java @@ -0,0 +1,22 @@ +package dev.dbos.transact.workflow.internal; + +import java.time.Duration; + +import org.jspecify.annotations.NonNull; +import org.jspecify.annotations.Nullable; + +/** + * Inputs to the debouncer service workflow that identify the user workflow to be eventually started + * and configures how it should be enqueued. + * + *

Not part of the public API. + */ +public record DebouncerOptions( + @NonNull String workflowName, + @NonNull String className, + @Nullable String instanceName, + @Nullable String queueName, + @Nullable Duration debounceTimeout, + @Nullable String appVersion, + @Nullable Integer priority, + @Nullable String deduplicationId) {} diff --git a/transact/src/main/java/dev/dbos/transact/workflow/internal/InternalWorkflows.java b/transact/src/main/java/dev/dbos/transact/workflow/internal/InternalWorkflows.java new file mode 100644 index 00000000..848bc59f --- /dev/null +++ b/transact/src/main/java/dev/dbos/transact/workflow/internal/InternalWorkflows.java @@ -0,0 +1,141 @@ +package dev.dbos.transact.workflow.internal; + +import dev.dbos.transact.Constants; +import dev.dbos.transact.DBOS; +import dev.dbos.transact.StartWorkflowOptions; +import dev.dbos.transact.exceptions.DBOSWorkflowFunctionNotFoundException; +import dev.dbos.transact.execution.DBOSExecutor; +import dev.dbos.transact.execution.RegisteredWorkflow; + +import java.lang.reflect.Method; +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; +import java.util.function.Supplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Built-in workflows registered by DBOS itself. Currently holds the debouncer service workflow. + * + *

Not part of the public API. + */ +public class InternalWorkflows { + + private static final Logger logger = LoggerFactory.getLogger(InternalWorkflows.class); + + private final DBOS dbos; + private final Supplier executorSupplier; + + public InternalWorkflows(DBOS dbos, Supplier executorSupplier) { + this.dbos = dbos; + this.executorSupplier = executorSupplier; + } + + /** + * Returns the {@link Method} reference for {@link #debouncerWorkflow}, used by DBOS at startup to + * register the workflow without relying on reflection over {@code @Workflow} annotations. + */ + public static Method debouncerWorkflowMethod() { + try { + return InternalWorkflows.class.getDeclaredMethod( + "debouncerWorkflow", + DebouncerOptions.class, + DebouncerContextOptions.class, + DebouncerMessage.class); + } catch (NoSuchMethodException e) { + throw new IllegalStateException("debouncerWorkflow method missing", e); + } + } + + public void debouncerWorkflow( + DebouncerOptions options, DebouncerContextOptions ctx, DebouncerMessage initial) { + + // Publish the pre-assigned user workflow id as an event so callers on the deduplication path + // can retrieve it via getEvent without having to parse workflow inputs. + dbos.setEvent(Constants.DEBOUNCER_CHILD_ID_KEY, ctx.userWorkflowId()); + + // Record the absolute deadline once as a durable step. On recovery this returns the same + // value so the loop's exit condition is replay-stable across crashes. + long deadlineEpochMs = + dbos.runStep( + () -> + options.debounceTimeout() == null + ? Long.MAX_VALUE + : Instant.now().plus(options.debounceTimeout()).toEpochMilli(), + "DBOS.debouncerComputeDeadline"); + + Object[] latestArgs = initial.args(); + Duration debouncePeriod = initial.debouncePeriod(); + + DBOSExecutor executor = executorSupplier.get(); + if (executor == null) { + throw new IllegalStateException("DBOS has not been launched. debounceWorkflow cannot run."); + } + while (true) { + long nowEpochMs = dbos.runStep(() -> Instant.now().toEpochMilli(), "DBOS.debouncerNow"); + Duration remaining = Duration.ofMillis(deadlineEpochMs - nowEpochMs); + if (remaining.compareTo(Duration.ZERO) <= 0) { + break; + } + Duration waitDuration = remaining.compareTo(debouncePeriod) < 0 ? remaining : debouncePeriod; + + Optional msg = dbos.recv(Constants.DEBOUNCER_TOPIC, waitDuration); + if (msg.isEmpty()) { + break; + } + DebouncerMessage next = msg.get(); + latestArgs = next.args(); + debouncePeriod = next.debouncePeriod(); + // Acknowledge receipt so the sender knows the message was consumed by this loop iteration. + dbos.setEvent(next.messageId(), next.messageId()); + } + + Optional optWorkflow = + executor.getRegisteredWorkflow( + options.workflowName(), options.className(), options.instanceName()); + if (optWorkflow.isEmpty()) { + // The user workflow is not registered in this process (e.g. it was renamed/removed, or we + // are recovering on a build that no longer declares it). We can never start it, so record + // a terminal ERROR for the pre-assigned user workflow id. Otherwise any handle returned to + // the caller would poll getResult() forever, since the status row would never appear. + var notFound = + new DBOSWorkflowFunctionNotFoundException(ctx.userWorkflowId(), options.workflowName()); + logger.error( + "Debouncer cannot find registered user workflow {} (id={}); recording ERROR", + options.workflowName(), + ctx.userWorkflowId(), + notFound); + executor.recordErrorForUnstartedWorkflow( + ctx.userWorkflowId(), + options.workflowName(), + options.className(), + options.instanceName(), + latestArgs, + notFound); + return; + } + var workflow = optWorkflow.get(); + + // priority and deduplicationId are only valid for queued workflows; the executor + // throws IllegalArgumentException if they are set without a queue name. + boolean hasQueue = options.queueName() != null; + var startOpts = + new StartWorkflowOptions() + .withWorkflowId(ctx.userWorkflowId()) + .withQueue(options.queueName()) + .withDeduplicationId(hasQueue ? options.deduplicationId() : null) + .withPriority(hasQueue ? options.priority() : null) + .withAppVersion(options.appVersion()); + if (ctx.workflowTimeout() != null) { + startOpts = startOpts.withTimeout(ctx.workflowTimeout()); + } + + logger.debug( + "Debouncer starting user workflow {} (id={})", + options.workflowName(), + ctx.userWorkflowId()); + executor.startRegisteredWorkflow(workflow, latestArgs, startOpts); + } +} diff --git a/transact/src/test/java/dev/dbos/transact/client/DebouncerClientTest.java b/transact/src/test/java/dev/dbos/transact/client/DebouncerClientTest.java new file mode 100644 index 00000000..0fa17cb7 --- /dev/null +++ b/transact/src/test/java/dev/dbos/transact/client/DebouncerClientTest.java @@ -0,0 +1,139 @@ +package dev.dbos.transact.client; + +import static org.junit.jupiter.api.Assertions.*; + +import dev.dbos.transact.DBOS; +import dev.dbos.transact.DBOSClient; +import dev.dbos.transact.DebouncerClient; +import dev.dbos.transact.config.DBOSConfig; +import dev.dbos.transact.utils.PgContainer; +import dev.dbos.transact.workflow.Queue; +import dev.dbos.transact.workflow.Workflow; +import dev.dbos.transact.workflow.WorkflowState; + +import java.time.Duration; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import com.zaxxer.hikari.HikariDataSource; +import org.junit.jupiter.api.AutoClose; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +interface ClientTargetService { + String process(String input); +} + +class ClientTargetServiceImpl implements ClientTargetService { + final AtomicInteger callCount = new AtomicInteger(); + final ConcurrentLinkedQueue callArgs = new ConcurrentLinkedQueue<>(); + + @Override + @Workflow + public String process(String input) { + callCount.incrementAndGet(); + callArgs.add(input); + return "result:" + input; + } +} + +public class DebouncerClientTest { + + @AutoClose final PgContainer pgContainer = new PgContainer(); + + DBOSConfig dbosConfig; + @AutoClose DBOS dbos; + @AutoClose HikariDataSource dataSource; + @AutoClose DBOSClient dbosClient; + + static final Queue USER_QUEUE = new Queue("client-user-queue"); + + ClientTargetServiceImpl serviceImpl; + + @BeforeEach + void beforeEach() { + dbosConfig = pgContainer.dbosConfig(); + dbos = new DBOS(dbosConfig); + dataSource = pgContainer.dataSource(); + + serviceImpl = new ClientTargetServiceImpl(); + dbos.registerProxy(ClientTargetService.class, serviceImpl); + dbos.registerQueue(USER_QUEUE); + dbos.launch(); + + dbosClient = + new DBOSClient(pgContainer.jdbcUrl(), pgContainer.username(), pgContainer.password()); + } + + private DebouncerClient debouncer() { + return dbosClient + .debouncer("process") + .withClassName(ClientTargetServiceImpl.class.getName()); + } + + @Test + void singleCallFiresOnce() throws Exception { + var handle = debouncer().debounce("key-1", Duration.ofMillis(500), "hello"); + assertEquals("result:hello", handle.getResult()); + assertEquals(1, serviceImpl.callCount.get()); + } + + @Test + void multipleCallsCoalesceToLatestArgs() throws Exception { + var d = debouncer(); + // Use a long period (3s) so the window cannot close between the three calls even on slow CI. + var h1 = d.debounce("key-2", Duration.ofSeconds(3), "v1"); + Thread.sleep(100); + var h2 = d.debounce("key-2", Duration.ofSeconds(3), "v2"); + Thread.sleep(100); + var h3 = d.debounce("key-2", Duration.ofSeconds(3), "v3"); + + String result = h3.getResult(); + assertEquals("result:v3", result); + assertEquals(h1.workflowId(), h2.workflowId()); + assertEquals(h2.workflowId(), h3.workflowId()); + assertEquals(1, serviceImpl.callCount.get()); + } + + @Test + void differentKeysFireIndependently() throws Exception { + var d = debouncer(); + var hA = d.debounce("key-A", Duration.ofMillis(400), "A"); + var hB = d.debounce("key-B", Duration.ofMillis(400), "B"); + + assertNotEquals(hA.workflowId(), hB.workflowId()); + assertEquals("result:A", hA.getResult()); + assertEquals("result:B", hB.getResult()); + assertEquals(2, serviceImpl.callCount.get()); + } + + @Test + void reDebounceAfterWindowCloses() throws Exception { + var d = debouncer(); + + var h1 = d.debounce("key-r", Duration.ofMillis(300), "first"); + assertEquals("result:first", h1.getResult()); + assertEquals(1, serviceImpl.callCount.get()); + + Thread.sleep(200); + + var h2 = d.debounce("key-r", Duration.ofMillis(300), "second"); + assertEquals("result:second", h2.getResult()); + assertEquals(2, serviceImpl.callCount.get()); + + assertNotEquals(h1.workflowId(), h2.workflowId()); + } + + @Test + void debouncerClientWithQueue() throws Exception { + var handle = + debouncer().withQueue(USER_QUEUE).debounce("key-q", Duration.ofMillis(400), "queued"); + + assertEquals("result:queued", handle.getResult()); + + var status = dbosClient.getWorkflowStatus(handle.workflowId()).orElseThrow(); + assertEquals(WorkflowState.SUCCESS, status.status()); + assertEquals(USER_QUEUE.name(), status.queueName()); + assertEquals(1, serviceImpl.callCount.get()); + } +} diff --git a/transact/src/test/java/dev/dbos/transact/workflow/DebouncerTest.java b/transact/src/test/java/dev/dbos/transact/workflow/DebouncerTest.java new file mode 100644 index 00000000..651182b4 --- /dev/null +++ b/transact/src/test/java/dev/dbos/transact/workflow/DebouncerTest.java @@ -0,0 +1,495 @@ +package dev.dbos.transact.workflow; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import dev.dbos.transact.Constants; +import dev.dbos.transact.DBOS; +import dev.dbos.transact.DBOSTestAccess; +import dev.dbos.transact.config.DBOSConfig; +import dev.dbos.transact.utils.PgContainer; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.AutoClose; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class DebouncerTest { + + @AutoClose final PgContainer pgContainer = new PgContainer(); + + DBOSConfig dbosConfig; + @AutoClose DBOS dbos; + + // Per-instance counters so parallel test methods do not interfere. + public interface DebouncedService { + String process(String input); + + int callCount(); + + java.util.List callArgs(); + } + + public static class DebouncedServiceImpl implements DebouncedService { + private final AtomicInteger callCount = new AtomicInteger(); + private final ConcurrentLinkedQueue callArgs = new ConcurrentLinkedQueue<>(); + // When set, the workflow blocks here while running so tests can inspect its in-flight status. + volatile CountDownLatch gate; + + @Override + @Workflow + public String process(String input) { + callCount.incrementAndGet(); + callArgs.add(input); + if (gate != null) { + try { + // Ceiling only; the test counts the gate down as soon as it has observed the status. + // Must exceed the observation window so the workflow stays in-flight until then. + gate.await(60, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + return "result:" + input; + } + + @Override + public int callCount() { + return callCount.get(); + } + + @Override + public java.util.List callArgs() { + return java.util.List.copyOf(callArgs); + } + } + + DebouncedServiceImpl serviceImpl; + + @BeforeEach + void beforeEach() { + dbosConfig = pgContainer.dbosConfig(); + dbos = new DBOS(dbosConfig); + serviceImpl = new DebouncedServiceImpl(); + } + + @Test + public void singleCallFiresOnce() throws Exception { + DebouncedService svc = dbos.registerProxy(DebouncedService.class, serviceImpl); + dbos.launch(); + + var handle = + dbos.debouncer().debounce("user-1", Duration.ofSeconds(1), () -> svc.process("v1")); + String result = handle.getResult(); + assertEquals("result:v1", result); + assertEquals(1, serviceImpl.callCount()); + assertEquals(List.of("v1"), serviceImpl.callArgs()); + } + + @Test + public void multipleCallsCoalesceToLatestArgs() throws Exception { + DebouncedService svc = dbos.registerProxy(DebouncedService.class, serviceImpl); + dbos.launch(); + + var debouncer = dbos.debouncer(); + var h1 = debouncer.debounce("user-2", Duration.ofMillis(800), () -> svc.process("v1")); + Thread.sleep(200); + var h2 = debouncer.debounce("user-2", Duration.ofMillis(800), () -> svc.process("v2")); + Thread.sleep(200); + var h3 = debouncer.debounce("user-2", Duration.ofMillis(800), () -> svc.process("v3")); + + String result = h3.getResult(); + assertEquals("result:v3", result); + // The three handles all point to the same final user workflow. + assertEquals(h1.workflowId(), h2.workflowId()); + assertEquals(h2.workflowId(), h3.workflowId()); + assertEquals(1, serviceImpl.callCount()); + assertEquals(List.of("v3"), serviceImpl.callArgs()); + } + + @Test + public void absoluteTimeoutFiresEvenIfCallsKeepArriving() throws Exception { + DebouncedService svc = dbos.registerProxy(DebouncedService.class, serviceImpl); + dbos.launch(); + + var debouncer = dbos.debouncer().withDebounceTimeout(Duration.ofMillis(1500)); + + var first = debouncer.debounce("user-3", Duration.ofMillis(800), () -> svc.process("v1")); + String firstId = first.workflowId(); + + // Keep extending the period — the absolute timeout should still kick in. + long deadline = System.currentTimeMillis() + 3000; + while (System.currentTimeMillis() < deadline && serviceImpl.callCount() == 0) { + debouncer.debounce("user-3", Duration.ofMillis(800), () -> svc.process("vN")); + Thread.sleep(150); + } + + String result = first.getResult(); + assertTrue(result.startsWith("result:")); + assertEquals(1, serviceImpl.callCount()); + assertEquals(firstId, first.workflowId()); + } + + @Test + public void differentKeysFireIndependently() throws Exception { + DebouncedService svc = dbos.registerProxy(DebouncedService.class, serviceImpl); + dbos.launch(); + + var debouncer = dbos.debouncer(); + var hA = debouncer.debounce("key-A", Duration.ofMillis(500), () -> svc.process("A")); + var hB = debouncer.debounce("key-B", Duration.ofMillis(500), () -> svc.process("B")); + + assertNotEquals(hA.workflowId(), hB.workflowId()); + assertEquals("result:A", hA.getResult()); + assertEquals("result:B", hB.getResult()); + assertEquals(2, serviceImpl.callCount()); + } + + @Test + public void concurrentCallsCoalesceSafely() throws Exception { + DebouncedService svc = dbos.registerProxy(DebouncedService.class, serviceImpl); + dbos.launch(); + + var debouncer = dbos.debouncer(); + int n = 8; + var pool = Executors.newFixedThreadPool(n); + try { + var ready = new CountDownLatch(n); + var go = new CountDownLatch(1); + var results = new ConcurrentLinkedQueue(); + for (int i = 0; i < n; i++) { + final String arg = "v" + i; + pool.submit( + () -> { + ready.countDown(); + go.await(); + var h = + debouncer.debounce("user-conc", Duration.ofMillis(600), () -> svc.process(arg)); + results.add(h.workflowId()); + return null; + }); + } + ready.await(5, TimeUnit.SECONDS); + go.countDown(); + pool.shutdown(); + assertTrue(pool.awaitTermination(15, TimeUnit.SECONDS)); + + // All concurrent callers must resolve to the same future user workflow id. + String first = results.peek(); + assertTrue(results.stream().allMatch(first::equals), "All handles must share workflow id"); + + // Wait for the user workflow to complete. + dbos.retrieveWorkflow(first).getResult(); + // Exactly one user workflow executed. + assertEquals(1, serviceImpl.callCount()); + } finally { + pool.shutdownNow(); + } + } + + @Test + public void debouncerOnQueueRunsViaThatQueue() throws Exception { + Queue userQueue = new Queue("debouncer-user-queue"); + dbos.registerQueue(userQueue); + DebouncedService svc = dbos.registerProxy(DebouncedService.class, serviceImpl); + dbos.launch(); + + var debouncer = dbos.debouncer().withQueue(userQueue); + var handle = debouncer.debounce("user-q", Duration.ofMillis(500), () -> svc.process("queued")); + assertEquals("result:queued", handle.getResult()); + + var status = dbos.getWorkflowStatus(handle.workflowId()).orElseThrow(); + assertEquals(userQueue.name(), status.queueName()); + assertEquals(1, serviceImpl.callCount()); + } + + // Workflow with numeric parameters to verify type coercion through send/recv round-trip. + public interface NumericService { + long compute(long value, double factor); + } + + public static class NumericServiceImpl implements NumericService { + @Override + @Workflow + public long compute(long value, double factor) { + return (long) (value * factor); + } + } + + @Test + public void numericArgsRoundTripCorrectly() throws Exception { + NumericService svc = dbos.registerProxy(NumericService.class, new NumericServiceImpl()); + dbos.launch(); + + var debouncer = dbos.debouncer(); + // First call + var h1 = debouncer.debounce("num-key", Duration.ofMillis(600), () -> svc.compute(10L, 2.5)); + Thread.sleep(100); + // Second call overrides args — after period the workflow runs with these values + var h2 = debouncer.debounce("num-key", Duration.ofMillis(600), () -> svc.compute(7L, 3.0)); + + assertEquals(h1.workflowId(), h2.workflowId()); + Long result = h2.getResult(); + // 7 * 3.0 = 21 + assertEquals(21L, result); + } + + // Verify that debounce works for workflows with no return value. + public interface VoidService { + void doWork(String marker); + } + + public static class VoidServiceImpl implements VoidService { + final AtomicInteger callCount = new AtomicInteger(); + final ConcurrentLinkedQueue markers = new ConcurrentLinkedQueue<>(); + + @Override + @Workflow + public void doWork(String marker) { + callCount.incrementAndGet(); + markers.add(marker); + } + } + + @Test + public void debounceCoalescesCorrectly() throws Exception { + var impl = new VoidServiceImpl(); + VoidService svc = dbos.registerProxy(VoidService.class, impl); + dbos.launch(); + + var debouncer = dbos.debouncer(); + var h1 = debouncer.debounce("void-key", Duration.ofMillis(500), () -> svc.doWork("a")); + Thread.sleep(100); + var h2 = debouncer.debounce("void-key", Duration.ofMillis(500), () -> svc.doWork("b")); + + h2.getResult(); + assertEquals(h1.workflowId(), h2.workflowId()); + assertEquals(1, impl.callCount.get()); + assertEquals(List.of("b"), List.copyOf(impl.markers)); + } + + // Verify that absoluteTimeout fires with the LATEST args, not the first. + @Test + public void absoluteTimeoutUsesLatestArgs() throws Exception { + DebouncedService svc = dbos.registerProxy(DebouncedService.class, serviceImpl); + dbos.launch(); + + // Long period (5s) so normal expiry cannot fire; only the 1.5s absolute timeout can. + var debouncer = dbos.debouncer().withDebounceTimeout(Duration.ofMillis(1500)); + + var h = debouncer.debounce("abs-key", Duration.ofSeconds(5), () -> svc.process("first")); + Thread.sleep(500); + debouncer.debounce("abs-key", Duration.ofSeconds(5), () -> svc.process("last")); + + String result = h.getResult(); + assertEquals("result:last", result); + assertEquals(1, serviceImpl.callCount()); + assertEquals(List.of("last"), serviceImpl.callArgs()); + } + + public interface OrchestratorService { + String debounceWithPriority(String arg); + } + + public static class OrchestratorServiceImpl implements OrchestratorService { + private final DBOS dbos; + private final DebouncedService svc; + private final Queue userQueue; + + public OrchestratorServiceImpl(DBOS dbos, DebouncedService svc, Queue userQueue) { + this.dbos = dbos; + this.svc = svc; + this.userQueue = userQueue; + } + + @Override + @Workflow + public String debounceWithPriority(String arg) { + return dbos.debouncer() + .withQueue(userQueue) + .withPriority(42) + .debounce("prio-inner", Duration.ofMillis(400), () -> svc.process(arg)) + .getResult(); + } + } + + // Verify that explicit withPriority() on Debouncer is forwarded to the user workflow. + @Test + public void explicitPriorityForwardedToUserWorkflow() throws Exception { + Queue q = new Queue("prio-queue").withPriorityEnabled(true); + dbos.registerQueue(q); + DebouncedService svc = dbos.registerProxy(DebouncedService.class, serviceImpl); + var orch = + dbos.registerProxy(OrchestratorService.class, new OrchestratorServiceImpl(dbos, svc, q)); + dbos.launch(); + + var h = dbos.startWorkflow(() -> orch.debounceWithPriority("prio-val")); + assertEquals("result:prio-val", h.getResult()); + + var userWfStatus = + dbos + .listWorkflows( + new ListWorkflowsInput().withQueueName(q.name()).withWorkflowName("process")) + .stream() + .findFirst() + .orElse(null); + assertNotNull(userWfStatus, "user workflow 'process' not found on queue " + q.name()); + assertEquals(Integer.valueOf(42), userWfStatus.priority()); + } + + // Verify that a second debounce call after the first window closes starts a fresh window. + // Regression test for: deduplication_id is cleared to NULL on completion, so the UNIQUE + // constraint no longer blocks a new enqueue with the same key. + @Test + public void reDebounceAfterWindowCloses() throws Exception { + DebouncedService svc = dbos.registerProxy(DebouncedService.class, serviceImpl); + dbos.launch(); + + var debouncer = dbos.debouncer(); + + // First window + var h1 = debouncer.debounce("rekey", Duration.ofMillis(400), () -> svc.process("first")); + assertEquals("result:first", h1.getResult()); + assertEquals(1, serviceImpl.callCount()); + + // Wait long enough to ensure the first debouncer workflow has completed. + Thread.sleep(300); + + // Second window — must NOT livelock; must start a fresh debouncer. + var h2 = debouncer.debounce("rekey", Duration.ofMillis(400), () -> svc.process("second")); + assertEquals("result:second", h2.getResult()); + assertEquals(2, serviceImpl.callCount()); + + // Each window produces an independent user workflow. + assertNotEquals(h1.workflowId(), h2.workflowId()); + } + + // Recovering/replaying the internal debouncer workflow must be idempotent: it reuses the + // pre-assigned user workflow id and must not start a second user workflow execution. + @Test + public void recoveryDoesNotRestartUserWorkflow() throws Exception { + DebouncedService svc = dbos.registerProxy(DebouncedService.class, serviceImpl); + dbos.launch(); + + var handle = + dbos.debouncer() + .debounce("rec-key", Duration.ofMillis(300), () -> svc.process("v1")); + String userWorkflowId = handle.workflowId(); + assertEquals("result:v1", handle.getResult()); + assertEquals(1, serviceImpl.callCount()); + + // Simulate a crash where the debouncer ran but did not durably record completion: flip only + // the debouncer workflow back to PENDING (the user workflow stays SUCCESS) and recover it. + // The debouncer finishes asynchronously after starting the user workflow, so wait until it is + // SUCCESS before flipping — otherwise the flip would race its own completion. + var executor = DBOSTestAccess.getDbosExecutor(dbos); + awaitDebouncerFlippedToPending(Duration.ofSeconds(30)); + + var recovered = executor.recoverPendingWorkflows(List.of(executor.executorId())); + assertEquals(1, recovered.size()); + for (var h : recovered) { + h.getResult(); + } + + // Replay reused the same user workflow id and did not run the user workflow again. The count + // check is independent of timing: a second user workflow would create a row at enqueue/start + // time, before it could execute, so it would be caught even if its body had not run yet. + assertEquals(1, countWorkflowsByName("process")); + assertEquals(1, serviceImpl.callCount()); + assertEquals(List.of("v1"), serviceImpl.callArgs()); + WorkflowHandle userHandle = dbos.retrieveWorkflow(userWorkflowId); + assertEquals("result:v1", userHandle.getResult()); + assertEquals(WorkflowState.SUCCESS, userHandle.getStatus().status()); + } + + private int countWorkflowsByName(String name) throws SQLException { + var sql = "SELECT count(*) FROM dbos.workflow_status WHERE name = ?"; + try (Connection conn = pgContainer.dataSource().getConnection(); + PreparedStatement stmt = conn.prepareStatement(sql)) { + stmt.setString(1, name); + try (var rs = stmt.executeQuery()) { + rs.next(); + return rs.getInt(1); + } + } + } + + // withDeduplicationId must forward the id to the queued user workflow. + @Test + public void deduplicationIdForwardedToQueuedUserWorkflow() throws Exception { + DebouncedService svc = dbos.registerProxy(DebouncedService.class, serviceImpl); + Queue userQueue = new Queue("dedup-user-queue"); + dbos.registerQueue(userQueue); + serviceImpl.gate = new CountDownLatch(1); + dbos.launch(); + + String dedupId = "user-dedup-1"; + var handle = + dbos.debouncer() + .withQueue(userQueue) + .withDeduplicationId(dedupId) + .debounce("dd-key", Duration.ofMillis(300), () -> svc.process("v1")); + + // The user workflow blocks on the gate while running, so its deduplication_id is still set + // (it is cleared only on completion). Wait for it to appear, then assert it was forwarded. + String observed = awaitDeduplicationId(handle, Duration.ofSeconds(30)); + assertEquals(dedupId, observed); + + serviceImpl.gate.countDown(); + assertEquals("result:v1", handle.getResult()); + assertEquals(1, serviceImpl.callCount()); + } + + private String awaitDeduplicationId(WorkflowHandle handle, Duration timeout) + throws InterruptedException { + long deadline = System.currentTimeMillis() + timeout.toMillis(); + while (System.currentTimeMillis() < deadline) { + try { + var status = handle.getStatus(); + if (status != null && status.deduplicationId() != null) { + return status.deduplicationId(); + } + } catch (RuntimeException ignored) { + // status row not present yet + } + Thread.sleep(50); + } + throw new AssertionError("user workflow deduplicationId not observed within timeout"); + } + + // Flip the (completed) debouncer workflow back to PENDING, retrying until it has reached SUCCESS + // so the result is deterministic regardless of how the debouncer's async completion interleaves. + private void awaitDebouncerFlippedToPending(Duration timeout) throws Exception { + var sql = + "UPDATE dbos.workflow_status SET status = ?, queue_name = NULL, updated_at = ?" + + " WHERE name = ? AND status = ?"; + long deadline = System.currentTimeMillis() + timeout.toMillis(); + while (System.currentTimeMillis() < deadline) { + try (Connection conn = pgContainer.dataSource().getConnection(); + PreparedStatement stmt = conn.prepareStatement(sql)) { + stmt.setString(1, WorkflowState.PENDING.name()); + stmt.setLong(2, Instant.now().toEpochMilli()); + stmt.setString(3, Constants.DEBOUNCER_WORKFLOW_NAME); + stmt.setString(4, WorkflowState.SUCCESS.name()); + if (stmt.executeUpdate() == 1) { + return; + } + } + Thread.sleep(50); + } + throw new AssertionError("debouncer workflow did not reach SUCCESS within timeout"); + } +}