Skip to content

Commit 2e6f80a

Browse files
authored
Merge pull request #16 from dbos-inc/manoj/timeout
Workflow timeouts changed compile to jdk17
2 parents f938882 + 855f582 commit 2e6f80a

17 files changed

Lines changed: 808 additions & 55 deletions

README.md

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,111 @@ They don't require a separate queueing service or message broker—just Post
148148

149149
</details>
150150

151+
<details><summary><strong>📒 Asynchronous execution </strong></summary>
152+
153+
####
154+
155+
DBOS can run your workflows asynchronously without you needing to make any changes to the interface or implementation.
156+
157+
This is ideal for long-running workflows whose result might not be available immediately.
158+
You code can return at a later point and check the status for completion and/or retrive the result.
159+
160+
161+
```java
162+
163+
164+
public void runAsyncWorkflow() {
165+
166+
SimpleWorkflowService syncExample = dbos.<SimpleWorkflowService>Workflow()
167+
.interfaceClass(SimpleWorkflowService.class)
168+
.implementation(new SimpleWorkflowServiceImpl())
169+
.build();
170+
syncExample.setSimpleWorkflowService(syncExample);
171+
172+
String workflowId = "wf-124";
173+
options = new DBOSOptions.Builder(workflowId).async().build();
174+
try (SetDBOSOptions id = new SetDBOSOptions(options)) {
175+
syncExample.exampleWorkflow("HelloDBOS");
176+
}
177+
178+
WorkflowHandle<String> handle = DBOS.retrieveWorkflow(workflowId); ;
179+
result = handle.getResult();
180+
181+
}
182+
```
183+
184+
[Read more ↗️](https://docs.dbos.dev/python/tutorials/queue-tutorial)
185+
186+
</details>
187+
188+
189+
<details><summary><strong>📅 Durable Scheduling</strong></summary>
190+
191+
####
192+
193+
Schedule workflows using cron syntax, or use durable sleep to pause workflows for as long as you like (even days or weeks) before executing.
194+
195+
You can schedule a workflow using a single annotation:
196+
197+
```python
198+
@DBOS.scheduled('* * * * *') # crontab syntax to run once every minute
199+
@DBOS.workflow()
200+
def example_scheduled_workflow(scheduled_time: datetime, actual_time: datetime):
201+
DBOS.logger.info("I am a workflow scheduled to run once a minute.")
202+
```
203+
204+
You can add a durable sleep to any workflow with a single line of code.
205+
It stores its wakeup time in Postgres so the workflow sleeps through any interruption or restart, then always resumes on schedule.
206+
207+
```java
208+
209+
public class SchedulerImpl {
210+
211+
@Workflow(name = "every5Second")
212+
@Scheduled(cron = "0/5 * * * * ?")
213+
public void every5Second(Instant schedule , Instant actual) {
214+
log.info("Executed workflow "+ schedule.toString() + " " + actual.toString()) ;
215+
}
216+
}
217+
218+
// In your main
219+
// dbos.scheduleWorkflow(new SchedulerImpl());
220+
```
221+
222+
[Read more ↗️](https://docs.dbos.dev/python/tutorials/scheduled-workflows)
223+
224+
</details>
225+
226+
<details><summary><strong>📫 Durable Notifications</strong></summary>
227+
228+
####
229+
230+
Pause your workflow executions until a notification is received, or emit events from your workflow to send progress updates to external clients.
231+
All notifications are stored in Postgres, so they can be sent and received with exactly-once semantics.
232+
Set durable timeouts when waiting for events, so you can wait for as long as you like (even days or weeks) through interruptions or restarts, then resume once a notification arrives or the timeout is reached.
233+
234+
For example, build a reliable billing workflow that durably waits for a notification from a payments service, processing it exactly-once:
235+
236+
```java
237+
@workflow(name = "billing")
238+
public void billingWorkflow() {
239+
// Calculate the charge, then submit the bill to a payments service
240+
String payment_status = (String) dbos.recv(PAYMENT_STATUS, timeout = payment_service_timeout);
241+
if (payment_status.equals("paid")) {
242+
// handle paid
243+
} else {
244+
// handle not paid
245+
}
246+
}
247+
248+
@workflow(name = "payment")
249+
public void payment() {
250+
dbos.send(targetWorkflowId, PAYMENT_STATUS, "paid") ;
251+
}
252+
253+
```
254+
</details>
255+
151256

152257
## Getting Started
153258

build.gradle.kts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,22 @@ plugins {
88
group = "dev.dbos"
99
version = "1.0-SNAPSHOT"
1010

11+
java {
12+
toolchain {
13+
languageVersion = JavaLanguageVersion.of(17)
14+
}
15+
}
1116

1217
tasks.withType<JavaCompile> {
13-
sourceCompatibility = "11"
14-
targetCompatibility = "11"
18+
options.release.set(11) // Targets Java 11 bytecode (RECOMMENDED)
19+
// (Alternative: sourceCompatibility = "11"; targetCompatibility = "11")
1520
}
1621

22+
// tasks.withType<JavaCompile> {
23+
// sourceCompatibility = "11"
24+
// targetCompatibility = "11"
25+
// }
26+
1727
repositories {
1828
mavenCentral()
1929
}

src/main/java/dev/dbos/transact/context/DBOSContext.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ public class DBOSContext {
2222

2323
// workflow timeouts
2424
private Queue queue;
25-
private int workflowTimeoutMs ;
26-
private int workflowDeadlineEpochMs ;
25+
private long workflowTimeoutMs ;
26+
private long workflowDeadlineEpochMs ;
2727

2828
// Queues
2929
private String deduplicationId ;
@@ -45,14 +45,16 @@ public DBOSContext(String workflowId, int functionId) {
4545
this.inWorkflow = false;
4646
}
4747

48-
public DBOSContext(String workflowId, int functionId, String parentWorkflowId, int parentFunctionId,boolean inWorkflow, boolean async, Queue q) {
48+
public DBOSContext(String workflowId, int functionId, String parentWorkflowId, int parentFunctionId,
49+
boolean inWorkflow, boolean async, Queue q, long timeout) {
4950
this.workflowId = workflowId;
5051
this.functionId = functionId ;
5152
this.inWorkflow = inWorkflow;
5253
this.parentWorkflowId = parentWorkflowId;
5354
this.parentFunctionId = parentFunctionId;
5455
this.async = async;
5556
this.queue = q;
57+
this.workflowTimeoutMs = timeout;
5658

5759
}
5860

@@ -62,26 +64,34 @@ public DBOSContext(DBOSOptions options, int functionId) {
6264
this.inWorkflow = false;
6365
this.async = options.isAsync() ;
6466
this.queue = options.getQueue();
67+
this.workflowTimeoutMs = options.getTimeout()*1000;
6568
}
6669

67-
private DBOSContext(String childWorkflowId, String parentWorkflowId, int parentFunctionId, boolean async, Queue queue) {
70+
private DBOSContext(String childWorkflowId, String parentWorkflowId, int parentFunctionId, boolean async, Queue queue, long workflowTimeout) {
6871
this.workflowId = childWorkflowId;
6972
this.parentWorkflowId = parentWorkflowId;
7073
this.parentFunctionId = parentFunctionId;
7174
this.inWorkflow = true;
7275
this.async = async ;
7376
this.queue = queue;
77+
this.workflowTimeoutMs = workflowTimeout;
7478
}
7579

76-
private DBOSContext(DBOSOptions options, String parentWorkflowId, int parentFunctionId) {
80+
private DBOSContext(DBOSOptions options, String parentWorkflowId, int parentFunctionId, long parentTimeout) {
7781
this.workflowId = options.getWorkflowId();
7882
this.parentWorkflowId = parentWorkflowId;
7983
this.parentFunctionId = parentFunctionId;
8084
this.inWorkflow = true;
8185
this.async = options.isAsync();
8286
this.queue = options.getQueue();
87+
if (options.getTimeout() > 0) {
88+
this.workflowTimeoutMs = options.getTimeout()*1000;
89+
} else {
90+
this.workflowTimeoutMs = parentTimeout ;
91+
}
8392
}
8493

94+
8595
public String getWorkflowId() {
8696
return workflowId;
8797
}
@@ -119,15 +129,15 @@ public void setStepId(String stepId) {
119129
}
120130

121131
public DBOSContext copy() {
122-
return new DBOSContext(workflowId, functionId, parentWorkflowId, parentFunctionId, inWorkflow, async, queue);
132+
return new DBOSContext(workflowId, functionId, parentWorkflowId, parentFunctionId, inWorkflow, async, queue, workflowTimeoutMs);
123133
}
124134

125135
public DBOSContext createChild(String childWorkflowId) {
126-
return new DBOSContext(childWorkflowId, workflowId, this.getAndIncrementFunctionId(), this.async, this.getQueue());
136+
return new DBOSContext(childWorkflowId, workflowId, this.getAndIncrementFunctionId(), this.async, this.getQueue(), this.workflowTimeoutMs);
127137
}
128138

129139
public DBOSContext createChild(DBOSOptions options) {
130-
return new DBOSContext(options, workflowId, this.getAndIncrementFunctionId());
140+
return new DBOSContext(options, workflowId, this.getAndIncrementFunctionId(), this.workflowTimeoutMs);
131141
}
132142

133143
public boolean hasParent() {
@@ -149,5 +159,9 @@ public boolean isAsync() {
149159
public Queue getQueue() {
150160
return queue;
151161
}
162+
163+
public long getWorkflowTimeoutMs() {
164+
return workflowTimeoutMs ;
165+
}
152166
}
153167

src/main/java/dev/dbos/transact/context/DBOSOptions.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ public class DBOSOptions {
99
private final boolean async;
1010
private final Queue queue;
1111
private final String workflowId;
12-
private final float timeoutSeconds ;
12+
private final long timeoutSeconds ;
1313

1414
private DBOSOptions(Builder builder) {
1515
this.async = builder.async;
@@ -32,15 +32,15 @@ public String getWorkflowId() {
3232
}
3333

3434

35-
public float getTimeout() {
35+
public long getTimeout() {
3636
return timeoutSeconds;
3737
}
3838

3939
public static class Builder {
4040
private boolean async = false;
4141
private Queue queue = null;
4242
private String workflowId = null;
43-
private float timeoutSeconds = 0f;
43+
private long timeoutSeconds = 0;
4444

4545
public Builder(String workflowId) {
4646
this.workflowId = workflowId;
@@ -57,7 +57,7 @@ public Builder queue(Queue queue) {
5757
}
5858

5959

60-
public Builder timeout(float timeout) {
60+
public Builder timeout(long timeout) {
6161
this.timeoutSeconds = timeout;
6262
return this;
6363
}

src/main/java/dev/dbos/transact/database/SystemDatabase.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,16 @@ public double sleep(String workflowId, int functionId, double seconds, boolean s
292292

293293
}
294294

295+
public void cancelWorkflow(String workflowId) {
296+
try {
297+
workflowDAO.cancelWorkflow(workflowId);
298+
} catch (SQLException sq) {
299+
logger.error("Sql Exception", sq);
300+
throw new DBOSException(UNEXPECTED.getCode(), sq.getMessage());
301+
}
302+
303+
}
304+
295305
private void createDataSource(String dbName) {
296306
HikariConfig hikariConfig = new HikariConfig();
297307

src/main/java/dev/dbos/transact/database/WorkflowDAO.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -659,4 +659,45 @@ public void recordChildWorkflow(String parentId,
659659

660660
}
661661

662+
public void cancelWorkflow(String workflowId) throws SQLException {
663+
664+
try (Connection conn = dataSource.getConnection()) {
665+
666+
// Check the status of the workflow. If it is complete, do nothing.
667+
String checkStatusSql = " SELECT status FROM %s.workflow_status WHERE workflow_uuid = ? " ;
668+
checkStatusSql = String.format(checkStatusSql, Constants.DB_SCHEMA);
669+
670+
String currentStatus = null;
671+
try (PreparedStatement stmt = conn.prepareStatement(checkStatusSql)) {
672+
stmt.setString(1, workflowId);
673+
try (ResultSet rs = stmt.executeQuery()) {
674+
if (rs.next()) {
675+
currentStatus = rs.getString("status");
676+
}
677+
}
678+
}
679+
680+
// If workflow doesn't exist or is already complete, do nothing
681+
if (currentStatus == null ||
682+
WorkflowState.SUCCESS.name().equals(currentStatus) ||
683+
WorkflowState.ERROR.name().equals(currentStatus)) {
684+
logger.info("Returning without updating status") ;
685+
return;
686+
}
687+
688+
// Set the workflow's status to CANCELLED and remove it from any queue it is on
689+
String updateSql = "UPDATE %s.workflow_status SET status = ?, " +
690+
" queue_name = NULL, deduplication_id = NULL, started_at_epoch_ms = NULL " +
691+
" WHERE workflow_uuid = ? " ;
692+
updateSql = String.format(updateSql, Constants.DB_SCHEMA) ;
693+
694+
try (PreparedStatement stmt = conn.prepareStatement(updateSql)) {
695+
stmt.setString(1, WorkflowState.CANCELLED.name());
696+
stmt.setString(2, workflowId);
697+
stmt.executeUpdate();
698+
}
699+
700+
}
701+
}
702+
662703
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
11
package dev.dbos.transact.exceptions;
22

33
public class AwaitedWorkflowCancelledException extends DBOSException {
4+
private String workflowId ;
45
public AwaitedWorkflowCancelledException(String workflowId) {
56
super(ErrorCode.WORKFLOW_CONFLICT.getCode(),
67
String.format("Awaited workflow %s was cancelled.", workflowId));
8+
this.workflowId = workflowId;
9+
}
10+
11+
public String getWorkflowId() {
12+
return workflowId;
713
}
814
}

0 commit comments

Comments
 (0)