Skip to content

Commit 7beee8a

Browse files
authored
Fix startWorkflow (#234)
`startWorkflow` requires information about the @workflow executed in the provided lambda. Previously, we added the start options to the context and executed the workflow. The @workflow info would end up in invokeWorkflow where the behavior was different based on the presence of start options. This approach creates thread pool exhaustion when starting 1000s of workflows and creates significant complexity in `DBOSContext` and `invokeWorkflow` to account for this difference in execution. This PR introduces a ThreadLocal hook method that `startWorkflow` can set and `DBOSInvocationHandler` can call. This allows `startWorkflow` to retrieve the class/instance/workflow names and the workflow argument from the provided lambda without requiring a thread pool thread. Once we have this information, we can call the private `executeWorkflow` method directly. This simplifies the context significantly and eliminates the need for `invokeWorkflow to do dual duty. This PR also changes `executeWorkflow` to use `CompletableFuture.supplyAsync` instead of `executorService.submit` to run the workflow. This will allow us to leverage different executors available on later versions of Java. However, this work will be in a separate PR. Note, the only changes to the tests are * Consistent use of DBOSAwaitedWorkflowCancelledException instead of CancellationException * `parentTimeoutInheritedByChild` no longer requires retries fixes #209 fixes #227
1 parent 2c407e7 commit 7beee8a

14 files changed

Lines changed: 297 additions & 296 deletions

File tree

transact/src/main/java/dev/dbos/transact/DBOS.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -679,7 +679,7 @@ public static Collection<RegisteredWorkflowInstance> getRegisteredWorkflowInstan
679679
*/
680680
public static WorkflowHandle<?, ?> startWorkflow(
681681
RegisteredWorkflow regWorkflow, Object[] args, StartWorkflowOptions options) {
682-
return executor("executeWorkflow").executeWorkflow(regWorkflow, args, options, null, null);
682+
return executor("startWorkflow").startWorkflow(regWorkflow, args, options);
683683
}
684684

685685
/**

transact/src/main/java/dev/dbos/transact/DBOSClient.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -119,23 +119,31 @@ public record EnqueueOptions(
119119
public EnqueueOptions {
120120
if (Objects.requireNonNull(workflowName, "EnqueueOptions workflowName must not be null")
121121
.isEmpty()) {
122-
throw new IllegalArgumentException("workflowName must not be empty");
122+
throw new IllegalArgumentException("EnqueueOptions workflowName must not be empty");
123123
}
124124

125125
if (Objects.requireNonNull(queueName, "EnqueueOptions queueName must not be null")
126126
.isEmpty()) {
127-
throw new IllegalArgumentException("queueName must not be empty");
127+
throw new IllegalArgumentException("EnqueueOptions queueName must not be empty");
128128
}
129129

130130
if (Objects.requireNonNull(className, "EnqueueOptions className must not be null")
131131
.isEmpty()) {
132-
throw new IllegalArgumentException("className must not be empty");
132+
throw new IllegalArgumentException("EnqueueOptions className must not be empty");
133133
}
134134

135135
if (instanceName == null) instanceName = "";
136136

137-
if (timeout != null && timeout.isNegative()) {
138-
throw new IllegalArgumentException("timeout must not be negative");
137+
if (timeout != null) {
138+
if (timeout.isNegative() || timeout.isZero()) {
139+
throw new IllegalArgumentException(
140+
"EnqueueOptions timeout must be a positive non-zero duration");
141+
}
142+
143+
if (deadline != null) {
144+
throw new IllegalArgumentException(
145+
"EnqueueOptions timeout and deadline cannot both be set");
146+
}
139147
}
140148
}
141149

@@ -340,19 +348,18 @@ public <T, E extends Exception> WorkflowHandle<T, E> enqueueWorkflow(
340348
Objects.requireNonNullElse(options.instanceName(), ""),
341349
null,
342350
args,
343-
new StartWorkflowOptions(
351+
new DBOSExecutor.ExecutionOptions(
344352
Objects.requireNonNullElseGet(options.workflowId(), () -> UUID.randomUUID().toString()),
345353
Timeout.of(options.timeout()),
354+
options.deadline,
346355
Objects.requireNonNull(
347356
options.queueName(), "EnqueueOptions queueName must not be null"),
348357
options.deduplicationId,
349-
options.priority,
350-
options.deadline),
358+
options.priority),
351359
null,
352360
null,
353361
options.appVersion,
354-
systemDatabase,
355-
null);
362+
systemDatabase);
356363
}
357364

358365
/**

transact/src/main/java/dev/dbos/transact/StartWorkflowOptions.java

Lines changed: 31 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,21 @@
2727
public record StartWorkflowOptions(
2828
String workflowId,
2929
Timeout timeout,
30+
Instant deadline,
3031
String queueName,
3132
String deduplicationId,
32-
Integer priority,
33-
Instant deadline) {
33+
Integer priority) {
3434

3535
public StartWorkflowOptions {
3636
if (timeout instanceof Timeout.Explicit explicit) {
3737
if (explicit.value().isNegative() || explicit.value().isZero()) {
38-
throw new IllegalArgumentException("timeout must be a positive non-zero duration");
38+
throw new IllegalArgumentException(
39+
"StartWorkflowOptions explicit timeout must be a positive non-zero duration");
40+
}
41+
42+
if (deadline != null) {
43+
throw new IllegalArgumentException(
44+
"StartWorkflowOptions explicit timeout and deadline cannot both be set");
3945
}
4046
}
4147
}
@@ -55,36 +61,21 @@ public StartWorkflowOptions withWorkflowId(String workflowId) {
5561
return new StartWorkflowOptions(
5662
workflowId,
5763
this.timeout,
64+
this.deadline,
5865
this.queueName,
5966
this.deduplicationId,
60-
this.priority,
61-
this.deadline);
67+
this.priority);
6268
}
6369

6470
/** Produces a new StartWorkflowOptions that overrides timeout value for the started workflow */
6571
public StartWorkflowOptions withTimeout(Timeout timeout) {
66-
if (timeout != null && this.deadline != null) {
67-
throw new IllegalArgumentException(
68-
"should not specify a timeout if the deadline is already set");
69-
}
7072
return new StartWorkflowOptions(
7173
this.workflowId,
7274
timeout,
75+
this.deadline,
7376
this.queueName,
7477
this.deduplicationId,
75-
this.priority,
76-
this.deadline);
77-
}
78-
79-
/** Produces a new StartWorkflowOptions that overrides deadline value for the started workflow */
80-
public StartWorkflowOptions withDeadline(Instant deadline) {
81-
return new StartWorkflowOptions(
82-
this.workflowId,
83-
this.timeout,
84-
this.queueName,
85-
this.deduplicationId,
86-
this.priority,
87-
deadline);
78+
this.priority);
8879
}
8980

9081
/** Produces a new StartWorkflowOptions that overrides timeout value for the started workflow */
@@ -102,16 +93,27 @@ public StartWorkflowOptions withNoTimeout() {
10293
return new StartWorkflowOptions(
10394
this.workflowId,
10495
Timeout.none(),
96+
this.deadline,
10597
this.queueName,
10698
this.deduplicationId,
107-
this.priority,
108-
this.deadline);
99+
this.priority);
100+
}
101+
102+
/** Produces a new StartWorkflowOptions that overrides deadline value for the started workflow */
103+
public StartWorkflowOptions withDeadline(Instant deadline) {
104+
return new StartWorkflowOptions(
105+
this.workflowId,
106+
this.timeout,
107+
deadline,
108+
this.queueName,
109+
this.deduplicationId,
110+
this.priority);
109111
}
110112

111113
/** Produces a new StartWorkflowOptions that assigns the started workflow to a queue */
112114
public StartWorkflowOptions withQueue(String queue) {
113115
return new StartWorkflowOptions(
114-
this.workflowId, this.timeout, queue, this.deduplicationId, this.priority, this.deadline);
116+
this.workflowId, this.timeout, this.deadline, queue, this.deduplicationId, this.priority);
115117
}
116118

117119
/** Produces a new StartWorkflowOptions that assigns the started workflow to a queue */
@@ -127,10 +129,10 @@ public StartWorkflowOptions withDeduplicationId(String deduplicationId) {
127129
return new StartWorkflowOptions(
128130
this.workflowId,
129131
this.timeout,
132+
this.deadline,
130133
this.queueName,
131134
deduplicationId,
132-
this.priority,
133-
this.deadline);
135+
this.priority);
134136
}
135137

136138
/**
@@ -141,23 +143,15 @@ public StartWorkflowOptions withPriority(Integer priority) {
141143
return new StartWorkflowOptions(
142144
this.workflowId,
143145
this.timeout,
146+
this.deadline,
144147
this.queueName,
145148
this.deduplicationId,
146-
priority,
147-
this.deadline);
149+
priority);
148150
}
149151

150152
/** Get the assigned workflow ID, replacing empty with null */
151153
@Override
152154
public String workflowId() {
153155
return workflowId != null && workflowId.isEmpty() ? null : workflowId;
154156
}
155-
156-
/** Get timeout duration */
157-
public Duration getTimeoutDuration() {
158-
if (timeout instanceof Timeout.Explicit e) {
159-
return e.value();
160-
}
161-
return null;
162-
}
163157
}

transact/src/main/java/dev/dbos/transact/context/DBOSContext.java

Lines changed: 0 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
import java.time.Duration;
77
import java.time.Instant;
8-
import java.util.Objects;
98
import java.util.concurrent.CompletableFuture;
109

1110
public class DBOSContext {
@@ -14,8 +13,6 @@ public class DBOSContext {
1413
String nextWorkflowId;
1514
Timeout nextTimeout;
1615
Instant nextDeadline;
17-
StartWorkflowOptions startOptions;
18-
String startedWorkflowId;
1916

2017
// current workflow fields
2118
private final String workflowId;
@@ -24,7 +21,6 @@ public class DBOSContext {
2421
private final WorkflowInfo parent;
2522
private final Duration timeout;
2623
private final Instant deadline;
27-
private CompletableFuture<String> startWorkflowFuture;
2824

2925
// private StepStatus stepStatus;
3026

@@ -58,14 +54,6 @@ public DBOSContext(
5854
this.parent = other.parent;
5955
this.timeout = other.timeout;
6056
this.deadline = other.deadline;
61-
62-
if (other.startedWorkflowId != null) {
63-
throw new IllegalStateException("startedWorkflowId not null");
64-
}
65-
66-
this.startOptions = options;
67-
this.startWorkflowFuture = future;
68-
this.startedWorkflowId = null;
6957
}
7058

7159
public boolean isInWorkflow() {
@@ -109,9 +97,6 @@ public String getNextWorkflowId() {
10997
}
11098

11199
public String getNextWorkflowId(String workflowId) {
112-
if (startOptions != null && startOptions.workflowId() != null) {
113-
return startOptions.workflowId();
114-
}
115100
if (nextWorkflowId != null) {
116101
var value = nextWorkflowId;
117102
this.nextWorkflowId = null;
@@ -122,10 +107,6 @@ public String getNextWorkflowId(String workflowId) {
122107
}
123108

124109
public Timeout getNextTimeout() {
125-
if (startOptions != null && startOptions.timeout() != null) {
126-
return startOptions.timeout();
127-
}
128-
129110
return nextTimeout;
130111
}
131112

@@ -134,66 +115,13 @@ public Duration getTimeout() {
134115
}
135116

136117
public Instant getNextDeadline() {
137-
if (startOptions != null && startOptions.deadline() != null) {
138-
return startOptions.deadline();
139-
}
140-
141118
return nextDeadline;
142119
}
143120

144121
public Instant getDeadline() {
145122
return deadline;
146123
}
147124

148-
public String getQueueName() {
149-
if (startOptions != null) {
150-
return startOptions.queueName();
151-
}
152-
return null;
153-
}
154-
155-
public String getDeduplicationId() {
156-
if (startOptions != null) {
157-
return startOptions.deduplicationId();
158-
}
159-
return null;
160-
}
161-
162-
public Integer getPriority() {
163-
if (startOptions != null) {
164-
return startOptions.priority();
165-
}
166-
return null;
167-
}
168-
169-
public void setStartOptions(StartWorkflowOptions options, CompletableFuture<String> future) {
170-
if (startedWorkflowId != null) {
171-
throw new IllegalStateException();
172-
}
173-
startOptions = options;
174-
startWorkflowFuture = future;
175-
}
176-
177-
public boolean validateStartedWorkflow() {
178-
return startOptions == null || startedWorkflowId == null;
179-
}
180-
181-
public void setStartedWorkflowId(String workflowId) {
182-
if (startOptions != null) {
183-
if (startedWorkflowId != null) {
184-
throw new IllegalStateException(
185-
String.format(
186-
"more than one workflow called from start workflow lambda: %s %s",
187-
workflowId, startedWorkflowId));
188-
}
189-
startedWorkflowId = Objects.requireNonNull(workflowId, "workflowId must not be null");
190-
}
191-
}
192-
193-
public CompletableFuture<String> getStartWorkflowFuture() {
194-
return this.startWorkflowFuture;
195-
}
196-
197125
public static String workflowId() {
198126
var ctx = DBOSContextHolder.get();
199127
return ctx == null ? null : ctx.workflowId;

transact/src/main/java/dev/dbos/transact/context/WorkflowOptions.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@
1919
public record WorkflowOptions(String workflowId, Timeout timeout, Instant deadline) {
2020

2121
public WorkflowOptions {
22-
if (timeout != null && timeout instanceof Timeout.Explicit explicit) {
22+
if (timeout instanceof Timeout.Explicit explicit) {
2323
if (explicit.value().isNegative() || explicit.value().isZero()) {
24-
throw new IllegalArgumentException("timeout must be a positive non-zero duration");
24+
throw new IllegalArgumentException(
25+
"WorkflowOptions explicit timeout must be a positive non-zero duration");
2526
}
26-
if (deadline != null && timeout instanceof Timeout.Explicit) {
27+
28+
if (deadline != null) {
2729
throw new IllegalArgumentException(
28-
"WorkflowOptions may not specify both `timeout` and `deadline`");
30+
"WorkflowOptions explicit timeout and deadline cannot both be set");
2931
}
3032
}
3133
}

0 commit comments

Comments
 (0)