Skip to content

Commit 94713bc

Browse files
devhawkchuck-dbos
andauthored
checkChildWorkflow at top of executeWorkflow (#223)
`DBOSExecutor.executeWorkflow` was mistakenly only checking child workflow in the execute path, not the enqueue path. This PR updates `executeWorkflow` to `checkChildWorkflow` first before attempting to enqueue or execute the workflow. cc @tomasol fixes #218 --------- Co-authored-by: Chuck B <chuck.bear@dbos.dev>
1 parent c9f052c commit 94713bc

5 files changed

Lines changed: 193 additions & 15 deletions

File tree

transact/src/main/java/dev/dbos/transact/database/StepsDAO.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public static void recordStepResultTxn(
4242
}
4343

4444
public static void recordStepResultTxn(
45-
StepResult result, long startTimeEpochMs, Connection connection, String schema)
45+
StepResult result, Long startTimeEpochMs, Connection connection, String schema)
4646
throws SQLException {
4747

4848
Objects.requireNonNull(schema);
@@ -77,12 +77,14 @@ public static void recordStepResultTxn(
7777
pstmt.setNull(6, Types.VARCHAR);
7878
}
7979

80-
pstmt.setLong(7, startTimeEpochMs);
81-
pstmt.setLong(8, System.currentTimeMillis());
80+
pstmt.setObject(7, startTimeEpochMs);
81+
Long endTime = startTimeEpochMs == null ? null : System.currentTimeMillis();
82+
pstmt.setObject(8, endTime);
8283

8384
pstmt.executeUpdate();
8485

8586
} catch (SQLException e) {
87+
logger.debug("recordStepResultTxn error", e);
8688
if ("23505".equals(e.getSQLState())) {
8789
throw new DBOSWorkflowExecutionConflictException(result.workflowId());
8890
} else {

transact/src/main/java/dev/dbos/transact/database/WorkflowDAO.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -646,7 +646,7 @@ public void recordChildWorkflow(
646646

647647
var result = new StepResult(parentId, functionId, functionName).withChildWorkflowId(childId);
648648
try (Connection connection = dataSource.getConnection()) {
649-
StepsDAO.recordStepResultTxn(result, startTime, connection, schema);
649+
StepsDAO.recordStepResultTxn(result, null, connection, schema);
650650
}
651651
}
652652

transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -758,12 +758,13 @@ public <T, E extends Exception> WorkflowHandle<T, E> startWorkflow(
758758
CompletableFuture<String> future = new CompletableFuture<>();
759759
var newCtx = new DBOSContext(ctx, options, functionId, future);
760760

761-
Callable<T> task =
761+
Callable<Void> task =
762762
() -> {
763763
DBOSContextHolder.clear();
764764
try {
765765
DBOSContextHolder.set(newCtx);
766-
return supplier.execute();
766+
supplier.execute();
767+
return null;
767768
} finally {
768769
DBOSContextHolder.clear();
769770
}
@@ -890,6 +891,16 @@ public <T, E extends Exception> WorkflowHandle<T, E> executeWorkflow(
890891
WorkflowInfo parent,
891892
CompletableFuture<String> latch) {
892893

894+
if (parent != null) {
895+
var childId = systemDatabase.checkChildWorkflow(parent.workflowId(), parent.functionId());
896+
if (childId.isPresent()) {
897+
if (latch != null) {
898+
latch.complete(childId.get());
899+
}
900+
return retrieveWorkflow(childId.get());
901+
}
902+
}
903+
893904
Integer maxRetries = workflow.maxRecoveryAttempts() > 0 ? workflow.maxRecoveryAttempts() : null;
894905

895906
if (options.queueName() != null) {
@@ -912,13 +923,6 @@ public <T, E extends Exception> WorkflowHandle<T, E> executeWorkflow(
912923
var workflowId = options.workflowId();
913924
WorkflowInitResult initResult = null;
914925
try {
915-
if (parent != null) {
916-
var childId = systemDatabase.checkChildWorkflow(parent.workflowId(), parent.functionId());
917-
if (childId.isPresent()) {
918-
return retrieveWorkflow(childId.get());
919-
}
920-
}
921-
922926
initResult =
923927
preInvokeWorkflow(
924928
systemDatabase,
@@ -967,7 +971,15 @@ public <T, E extends Exception> WorkflowHandle<T, E> executeWorkflow(
967971
DBOSContextHolder.set(
968972
new DBOSContext(
969973
workflowId, parent, options.getTimeoutDuration(), options.deadline()));
974+
if (Thread.currentThread().isInterrupted()) {
975+
logger.debug("executeWorkflow task interrupted before workflow.invoke");
976+
return null;
977+
}
970978
T result = workflow.invoke(args);
979+
if (Thread.currentThread().isInterrupted()) {
980+
logger.debug("executeWorkflow task interrupted before postInvokeWorkflowResult");
981+
return null;
982+
}
971983
postInvokeWorkflowResult(systemDatabase, workflowId, result);
972984
return result;
973985
} catch (DBOSWorkflowExecutionConflictException e) {
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
package dev.dbos.transact.issues;
2+
3+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
4+
import static org.junit.jupiter.api.Assertions.assertEquals;
5+
import static org.junit.jupiter.api.Assertions.assertNotNull;
6+
import static org.junit.jupiter.api.Assertions.assertNull;
7+
import static org.junit.jupiter.api.Assertions.assertTrue;
8+
9+
import dev.dbos.transact.DBOS;
10+
import dev.dbos.transact.StartWorkflowOptions;
11+
import dev.dbos.transact.config.DBOSConfig;
12+
import dev.dbos.transact.database.SystemDatabase;
13+
import dev.dbos.transact.utils.DBUtils;
14+
import dev.dbos.transact.workflow.Queue;
15+
import dev.dbos.transact.workflow.Workflow;
16+
import dev.dbos.transact.workflow.WorkflowHandle;
17+
18+
import java.sql.SQLException;
19+
import java.util.ArrayList;
20+
import java.util.List;
21+
22+
import com.zaxxer.hikari.HikariDataSource;
23+
import org.junit.jupiter.api.AfterEach;
24+
import org.junit.jupiter.api.BeforeAll;
25+
import org.junit.jupiter.api.BeforeEach;
26+
import org.junit.jupiter.api.Test;
27+
28+
interface Example {
29+
30+
public void taskWorkflow(int i) throws Exception;
31+
32+
public void parentParallel() throws Exception;
33+
}
34+
35+
class ExampleImpl implements Example {
36+
37+
private final Queue queue;
38+
private Example proxy;
39+
40+
public ExampleImpl(Queue queue) {
41+
this.queue = queue;
42+
}
43+
44+
public void setProxy(Example proxy) {
45+
this.proxy = proxy;
46+
}
47+
48+
@Workflow(name = "task-workflow")
49+
public void taskWorkflow(int i) throws Exception {
50+
System.out.printf("Task %d started%n", i);
51+
Thread.sleep(i * 100);
52+
System.out.printf("Task %d completed%n", i);
53+
}
54+
55+
@Workflow(name = "parent-parallel")
56+
public void parentParallel() throws Exception {
57+
System.out.println("parent-parallel started");
58+
List<WorkflowHandle<Void, Exception>> handles = new ArrayList<>();
59+
for (int i = 0; i < 10; i++) {
60+
final int index = i;
61+
WorkflowHandle<Void, Exception> handle =
62+
DBOS.startWorkflow(
63+
() -> this.proxy.taskWorkflow(index),
64+
new StartWorkflowOptions().withQueue(this.queue));
65+
handles.add(handle);
66+
}
67+
System.out.println("parent-parallel submitted all child tasks");
68+
for (WorkflowHandle<Void, Exception> handle : handles) {
69+
try {
70+
handle.getResult();
71+
} catch (Exception e) {
72+
System.out.println("Task failed " + e);
73+
throw e;
74+
}
75+
}
76+
System.out.println("parent-parallel completed");
77+
}
78+
}
79+
80+
// @org.junit.jupiter.api.Timeout(value = 2, unit = TimeUnit.MINUTES)
81+
public class Issue218 {
82+
private static DBOSConfig dbosConfig;
83+
private HikariDataSource dataSource;
84+
85+
@BeforeAll
86+
static void onetimeSetup() throws Exception {
87+
dbosConfig =
88+
DBOSConfig.defaultsFromEnv("systemdbtest")
89+
.withDatabaseUrl("jdbc:postgresql://localhost:5432/dbos_java_sys")
90+
.withMaximumPoolSize(2);
91+
}
92+
93+
@BeforeEach
94+
void beforeEachTest() throws SQLException {
95+
DBUtils.recreateDB(dbosConfig);
96+
DBOS.reinitialize(dbosConfig);
97+
dataSource = SystemDatabase.createDataSource(dbosConfig);
98+
}
99+
100+
@AfterEach
101+
void afterEachTest() throws Exception {
102+
DBOS.shutdown();
103+
}
104+
105+
@Test
106+
void issue218() throws Exception {
107+
108+
var queue = new Queue("test-queue");
109+
DBOS.registerQueue(queue);
110+
var impl = new ExampleImpl(queue);
111+
Example proxy = DBOS.registerWorkflows(Example.class, impl);
112+
impl.setProxy(proxy);
113+
114+
DBOS.launch();
115+
116+
var handle = DBOS.startWorkflow(() -> proxy.parentParallel());
117+
var wfid = handle.workflowId();
118+
DBOS.shutdown();
119+
120+
var rows = DBUtils.getWorkflowRows(dataSource);
121+
for (var row : rows) {
122+
var expected = row.workflowId().equals(wfid) ? "PENDING" : "ENQUEUED";
123+
assertEquals(expected, row.status());
124+
}
125+
126+
var steps = DBUtils.getStepRows(dataSource, wfid);
127+
for (var step : steps) {
128+
assertNull(step.output());
129+
assertNull(step.error());
130+
assertTrue(step.childWorkflowId().startsWith(wfid));
131+
}
132+
133+
DBOS.launch();
134+
assertDoesNotThrow(() -> DBOS.getResult(wfid));
135+
136+
rows = DBUtils.getWorkflowRows(dataSource);
137+
for (var row : rows) {
138+
assertEquals("SUCCESS", row.status());
139+
}
140+
141+
steps = DBUtils.getStepRows(dataSource, wfid);
142+
for (var step : steps) {
143+
assertTrue(
144+
step.functionName().equals("task-workflow")
145+
|| step.functionName().equals("DBOS.getResult"));
146+
assertNull(step.error());
147+
assertTrue(step.childWorkflowId().startsWith(wfid));
148+
if (step.functionName().equals("task-workflow")) {
149+
assertNull(step.output());
150+
assertNull(step.startedAt());
151+
assertNull(step.completedAt());
152+
}
153+
if (step.functionName().equals("DBOS.getResult")) {
154+
assertNotNull(step.output());
155+
assertNotNull(step.startedAt());
156+
assertNotNull(step.completedAt());
157+
}
158+
}
159+
}
160+
}

transact/src/test/java/dev/dbos/transact/utils/OperationOutputRow.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ public record OperationOutputRow(
99
String output,
1010
String error,
1111
String functionName,
12-
String childWorkflowId) {
12+
String childWorkflowId,
13+
Long startedAt,
14+
Long completedAt) {
1315

1416
public OperationOutputRow(ResultSet rs) throws SQLException {
1517
this(
@@ -18,6 +20,8 @@ public OperationOutputRow(ResultSet rs) throws SQLException {
1820
rs.getString("output"),
1921
rs.getString("error"),
2022
rs.getString("function_name"),
21-
rs.getString("child_workflow_id"));
23+
rs.getString("child_workflow_id"),
24+
rs.getObject("started_at_epoch_ms", Long.class),
25+
rs.getObject("completed_at_epoch_ms", Long.class));
2226
}
2327
}

0 commit comments

Comments
 (0)