Skip to content

Commit d77f70f

Browse files
committed
Fail debounced workflow with ERROR instead of hanging when unregistered
Add recordErrorForUnstartedWorkflow so awaiting handles fail fast; validate registration up front in Debouncer.
1 parent ce9f7c5 commit d77f70f

8 files changed

Lines changed: 132 additions & 14 deletions

File tree

transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,14 @@ public void recordWorkflowError(String workflowId, String error) {
359359
dbRetry(() -> WorkflowDAO.recordWorkflowError(ctx, workflowId, error));
360360
}
361361

362+
/**
363+
* Insert a workflow_status row already in the ERROR state, for a workflow that was never started.
364+
* See {@link WorkflowDAO#recordErrorForUnstartedWorkflow}.
365+
*/
366+
public void recordErrorForUnstartedWorkflow(WorkflowStatusInternal initStatus, String error) {
367+
dbRetry(() -> WorkflowDAO.recordErrorForUnstartedWorkflow(ctx, initStatus, error));
368+
}
369+
362370
public WorkflowStatus getWorkflowStatus(String workflowId) {
363371
return dbRetry(() -> WorkflowDAO.getWorkflowStatus(ctx, workflowId));
364372
}

transact/src/main/java/dev/dbos/transact/database/dao/WorkflowDAO.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,28 @@ public static void recordWorkflowError(DbContext ctx, String workflowId, String
374374
}
375375
}
376376

377+
/**
378+
* Insert a workflow_status row and immediately mark it ERROR, for a workflow that was never
379+
* actually started. Used when an internal workflow that is responsible for starting a user
380+
* workflow fails before it can do so: without a status row, any handle awaiting the user
381+
* workflow would poll {@link #awaitWorkflowResult} forever.
382+
*
383+
* @param initStatus metadata for the workflow that will be recorded as failed
384+
* @param error the error serialized as json
385+
*/
386+
public static void recordErrorForUnstartedWorkflow(
387+
DbContext ctx, WorkflowStatusInternal initStatus, String error) throws SQLException {
388+
389+
// No explicit transaction: the calling debouncer workflow is itself durable, so a crash
390+
// between these two statements is replayed and retried. ON CONFLICT makes the insert
391+
// idempotent and the outcome update is safe to repeat.
392+
try (var conn = ctx.getConnection()) {
393+
insertWorkflowStatus(conn, ctx.schema(), initStatus, UUID.randomUUID().toString(), false);
394+
updateWorkflowOutcome(
395+
conn, ctx.schema(), initStatus.workflowId(), WorkflowState.ERROR, null, error);
396+
}
397+
}
398+
377399
public static String getWorkflowSerialization(DbContext ctx, String workflowId)
378400
throws SQLException {
379401
var sql =

transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1882,4 +1882,50 @@ private void persistWorkflowError(String workflowId, Throwable error, String ser
18821882
var serialized = SerializationUtil.serializeError(error, serialization, this.serializer);
18831883
systemDatabase.recordWorkflowError(workflowId, serialized.serializedValue());
18841884
}
1885+
1886+
/**
1887+
* Record an ERROR result for a workflow that was never started, so that handles awaiting it fail
1888+
* fast instead of polling forever. Used by internal workflows (e.g. the debouncer) that take
1889+
* responsibility for starting a user workflow and must surface their own failures to the caller's
1890+
* handle when they cannot.
1891+
*/
1892+
public void recordErrorForUnstartedWorkflow(
1893+
String workflowId,
1894+
String workflowName,
1895+
String className,
1896+
@Nullable String instanceName,
1897+
@Nullable Object[] args,
1898+
Throwable error) {
1899+
String serialization = this.serializer.name();
1900+
var serializedArgs =
1901+
SerializationUtil.serializeArgs(
1902+
Objects.requireNonNullElseGet(args, () -> new Object[0]),
1903+
null,
1904+
serialization,
1905+
this.serializer);
1906+
var serializedError = SerializationUtil.serializeError(error, serialization, this.serializer);
1907+
var initStatus =
1908+
new WorkflowStatusInternal(
1909+
workflowId,
1910+
workflowName,
1911+
className,
1912+
instanceName,
1913+
null,
1914+
null,
1915+
null,
1916+
null,
1917+
null,
1918+
null,
1919+
null,
1920+
null,
1921+
serializedArgs.serializedValue(),
1922+
executorId(),
1923+
appVersion(),
1924+
appId(),
1925+
null,
1926+
null,
1927+
null,
1928+
serializedArgs.serialization());
1929+
systemDatabase.recordErrorForUnstartedWorkflow(initStatus, serializedError.serializedValue());
1930+
}
18851931
}

transact/src/main/java/dev/dbos/transact/internal/DBOSIntegration.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,23 @@ public RegisteredWorkflow registerInternalWorkflow(
195195
return executor("startRegisteredWorkflow").startRegisteredWorkflow(regWorkflow, args, options);
196196
}
197197

198+
/**
199+
* Record a terminal ERROR for a workflow that was never started, so handles awaiting it fail
200+
* fast instead of polling forever. Used by the built-in debouncer workflow when it cannot start
201+
* the user workflow it is responsible for.
202+
*/
203+
public void recordErrorForUnstartedWorkflow(
204+
String workflowId,
205+
String workflowName,
206+
String className,
207+
@Nullable String instanceName,
208+
@Nullable Object[] args,
209+
Throwable error) {
210+
executor("recordErrorForUnstartedWorkflow")
211+
.recordErrorForUnstartedWorkflow(
212+
workflowId, workflowName, className, instanceName, args, error);
213+
}
214+
198215
/**
199216
* Execute a workflow method via its reflective {@link Method} handle. Intended for use by AOP
200217
* interceptors that capture workflow invocations at the proxy boundary.

transact/src/main/java/dev/dbos/transact/workflow/Debouncer.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import dev.dbos.transact.StartWorkflowOptions;
66
import dev.dbos.transact.context.DBOSContextHolder;
77
import dev.dbos.transact.exceptions.DBOSQueueDuplicatedException;
8+
import dev.dbos.transact.exceptions.DBOSWorkflowFunctionNotFoundException;
89
import dev.dbos.transact.execution.DBOSExecutor;
910
import dev.dbos.transact.execution.RegisteredWorkflow;
1011
import dev.dbos.transact.execution.ThrowingRunnable;
@@ -250,6 +251,15 @@ private <T, E extends Exception> WorkflowHandle<T, E> debounceInternal(
250251
}
251252
String userWorkflowId = ids.userWorkflowId();
252253
String messageId = ids.messageId();
254+
255+
// Fail fast for the common programmer error of debouncing an unregistered workflow. Without
256+
// this the call would still succeed here and only fail later inside the debouncer workflow.
257+
if (executor
258+
.getRegisteredWorkflow(
259+
invocation.workflowName(), invocation.className(), invocation.instanceName())
260+
.isEmpty()) {
261+
throw new DBOSWorkflowFunctionNotFoundException(userWorkflowId, invocation.workflowName());
262+
}
253263
String debouncerDeduplicationId = invocation.workflowName() + "-" + debounceKey;
254264

255265
DebouncerOptions options =

transact/src/main/java/dev/dbos/transact/workflow/internal/InternalWorkflows.java

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import dev.dbos.transact.Constants;
44
import dev.dbos.transact.DBOS;
55
import dev.dbos.transact.StartWorkflowOptions;
6+
import dev.dbos.transact.exceptions.DBOSWorkflowFunctionNotFoundException;
7+
import dev.dbos.transact.execution.RegisteredWorkflow;
68

79
import java.lang.reflect.Method;
810
import java.time.Duration;
@@ -82,20 +84,33 @@ public void debouncerWorkflow(
8284
dbos.setEvent(next.messageId(), next.messageId());
8385
}
8486

85-
var workflow =
87+
Optional<RegisteredWorkflow> optWorkflow =
8688
dbos.integration()
8789
.getRegisteredWorkflow(
88-
options.workflowName(), options.className(), options.instanceName())
89-
.orElseThrow(
90-
() ->
91-
new IllegalStateException(
92-
"Debouncer cannot find registered user workflow: "
93-
+ options.workflowName()
94-
+ " / "
95-
+ options.className()
96-
+ (options.instanceName() == null
97-
? ""
98-
: " / " + options.instanceName())));
90+
options.workflowName(), options.className(), options.instanceName());
91+
if (optWorkflow.isEmpty()) {
92+
// The user workflow is not registered in this process (e.g. it was renamed/removed, or we
93+
// are recovering on a build that no longer declares it). We can never start it, so record
94+
// a terminal ERROR for the pre-assigned user workflow id. Otherwise any handle returned to
95+
// the caller would poll getResult() forever, since the status row would never appear.
96+
var notFound =
97+
new DBOSWorkflowFunctionNotFoundException(ctx.userWorkflowId(), options.workflowName());
98+
logger.error(
99+
"Debouncer cannot find registered user workflow {} (id={}); recording ERROR",
100+
options.workflowName(),
101+
ctx.userWorkflowId(),
102+
notFound);
103+
dbos.integration()
104+
.recordErrorForUnstartedWorkflow(
105+
ctx.userWorkflowId(),
106+
options.workflowName(),
107+
options.className(),
108+
options.instanceName(),
109+
latestArgs,
110+
notFound);
111+
return;
112+
}
113+
var workflow = optWorkflow.get();
99114

100115
// priority and deduplicationId are only valid for queued workflows; the executor
101116
// throws IllegalArgumentException if they are set without a queue name.

transact/src/test/java/dev/dbos/transact/client/DebouncerClientTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ void differentKeysFireIndependently() throws Exception {
108108
}
109109

110110
@Test
111-
void reDebouncAfterWindowCloses() throws Exception {
111+
void reDebounceAfterWindowCloses() throws Exception {
112112
var d = debouncer();
113113

114114
var h1 = d.debounce("key-r", Duration.ofMillis(300), "first");

transact/src/test/java/dev/dbos/transact/workflow/DebouncerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,7 @@ public void explicitPriorityForwardedToUserWorkflow() throws Exception {
337337
// Regression test for: deduplication_id is cleared to NULL on completion, so the UNIQUE
338338
// constraint no longer blocks a new enqueue with the same key.
339339
@Test
340-
public void reDebouncAfterWindowCloses() throws Exception {
340+
public void reDebounceAfterWindowCloses() throws Exception {
341341
DebouncedService svc = dbos.registerProxy(DebouncedService.class, serviceImpl);
342342
dbos.launch();
343343

0 commit comments

Comments
 (0)