Skip to content

Commit f938882

Browse files
authored
Merge pull request #15 from dbos-inc/manoj/sleep
Manoj/sleep
2 parents 9c6074e + 96e9350 commit f938882

11 files changed

Lines changed: 447 additions & 88 deletions

File tree

DEVELOPING.md

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# dbos-transact-java
2+
DBOS Transact Java SDK
3+
4+
## Setting up dev environment
5+
6+
Install a recent OpenJDK. I use OpenJDK 21.
7+
https://adoptium.net/en-GB/temurin/releases/?os=any&arch=any&version=21
8+
9+
Recommended IDE IntelliJ (Community edition is fine).
10+
But feel free to use vi or VSCode, if you are more comfortable with it.
11+
12+
Postgres docker container with
13+
localhost
14+
port 5432
15+
user postgres
16+
17+
export PGPASSWORD = password for postgres user
18+
19+
## build
20+
21+
./gradlew clean build
22+
23+
## run tests
24+
25+
./gradlew clean test
26+
27+
## publish to local maven repository
28+
29+
./gradlew publishToMavenLocal
30+
31+
## import transact into your application
32+
33+
Add to your build.gradle.kts
34+
35+
implementation("dev.dbos:transact:1.0-SNAPSHOT")
36+
implementation("ch.qos.logback:logback-classic:1.5.6")
37+
38+
Annotations @Workflow, @Transaction, @Step need to be on implementation class methods.

README.md

Lines changed: 204 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,222 @@
1-
# dbos-transact-java
2-
DBOS Transact Java SDK
1+
<div align="center">
32

4-
## Setting up dev environment
3+
# DBOS Transact: Lightweight Durable Workflows
54

6-
Install a recent OpenJDK. I use OpenJDK 21.
7-
https://adoptium.net/en-GB/temurin/releases/?os=any&arch=any&version=21
5+
#### [Documentation](https://docs.dbos.dev/) &nbsp;&nbsp;&nbsp;&nbsp; [Examples](https://docs.dbos.dev/examples) &nbsp;&nbsp;&nbsp;&nbsp; [Github](https://github.com/dbos-inc) &nbsp;&nbsp;&nbsp;&nbsp; [Discord](https://discord.com/invite/jsmC6pXGgX)
6+
</div>
87

9-
Recommended IDE IntelliJ (Community edition is fine).
10-
But feel free to use vi or VSCode, if you are more comfortable with it.
8+
---
119

12-
Postgres docker container with
13-
localhost
14-
port 5432
15-
user postgres
10+
## What is DBOS?
1611

17-
export PGPASSWORD = password for postgres user
12+
DBOS provides lightweight durable workflows built on top of Postgres.
13+
Instead of managing your own workflow orchestrator or task queue system, you can use DBOS to add durable workflows and queues to your program in just a few lines of code.
1814

19-
## build
15+
To get started, follow the [quickstart](https://docs.dbos.dev/quickstart) to install this open-source library and connect it to a Postgres database.
16+
Then, annotate workflows and steps in your program to make it durable!
17+
That's all you need to do&mdash;DBOS is entirely contained in this open-source library, there's no additional infrastructure for you to configure or manage.
2018

21-
./gradlew clean build
19+
## When Should I Use DBOS?
2220

23-
## run tests
21+
You should consider using DBOS if your application needs to **reliably handle failures**.
22+
For example, you might be building a payments service that must reliably process transactions even if servers crash mid-operation, or a long-running data pipeline that needs to resume seamlessly from checkpoints rather than restart from the beginning when interrupted.
2423

25-
./gradlew clean test
24+
Handling failures is costly and complicated, requiring complex state management and recovery logic as well as heavyweight tools like external orchestration services.
25+
DBOS makes it simpler: annotate your code to checkpoint it in Postgres and automatically recover from any failure.
26+
DBOS also provides powerful Postgres-backed primitives that makes it easier to write and operate reliable code, including durable queues, notifications, scheduling, event processing, and programmatic workflow management.
2627

27-
## publish to local maven repository
28+
## Features
2829

29-
./gradlew publishToMavenLocal
30+
<details open><summary><strong>💾 Durable Workflows</strong></summary>
3031

31-
## import transact into your application
32+
####
3233

33-
Add to your build.gradle.kts
34+
DBOS workflows make your program **durable** by checkpointing its state in Postgres.
35+
If your program ever fails, when it restarts all your workflows will automatically resume from the last completed step.
36+
37+
You add durable workflows to your existing Java program by annotating ordinary functions as workflows and steps:
38+
39+
```java
40+
41+
public interface SimpleWorkflowService {
42+
43+
void setSimpleWorkflowService(SimpleWorkflowService e);
44+
String exampleWorkflow(String input) ;
45+
void stepOne() ;
46+
void stepTwo() ;
47+
}
48+
49+
public class SimpleWorkflowServiceImpl implements SimpleWorkflowService {
50+
51+
public void setSimpleWorkflowService(SimpleWorkflowService simpleWorkflow) {
52+
this.simpleWorkflowService = simpleWorkflow;
53+
}
54+
55+
@Workflow(name = "exampleWorkflow")
56+
public String exampleWorkflow(String input) {
57+
return input + input;
58+
}
59+
@Step(name = "stepOne")
60+
public void stepOne() {
61+
logger.info("Executed stepOne") ;
62+
}
63+
@Step(name = "stepTwo")
64+
public void stepTwo() {
65+
logger.info("Executed stepTwo") ;
66+
}
67+
68+
}
69+
70+
public class Demo {
71+
72+
public static void main(String[] args) {
73+
74+
DBOSConfig dbosConfig = new DBOSConfig.Builder()
75+
.name("demo")
76+
.dbHost("localhost")
77+
.dbPort(5432)
78+
.dbUser("postgres")
79+
.sysDbName("demo_dbos_sys")
80+
.build() ;
81+
82+
DBOS.initialize(dbosConfig);
83+
DBOS dbos = DBOS.getInstance();
84+
dbos.launch();
85+
86+
SimpleWorkflowService syncExample = dbos.<SimpleWorkflowService>Workflow()
87+
.interfaceClass(SimpleWorkflowService.class)
88+
.implementation(new SimpleWorkflowServiceImpl())
89+
.build();
90+
syncExample.setSimpleWorkflowService(syncExample);
91+
92+
String output = syncExample.exampleWorkflow("HelloDBOS") ;
93+
System.out.println("Sync result: " + output);
94+
}
95+
}
96+
97+
98+
```
99+
100+
Workflows are particularly useful for
101+
102+
- Orchestrating business processes so they seamlessly recover from any failure.
103+
- Building observable and fault-tolerant data pipelines.
104+
- Operating an AI agent, or any application that relies on unreliable or non-deterministic APIs.
105+
106+
[Read more ↗️]()
107+
108+
</details>
109+
110+
<details><summary><strong>📒 Durable Queues</strong></summary>
111+
112+
####
113+
114+
DBOS queues help you **durably** run tasks in the background.
115+
You can enqueue a task (which can be a single step or an entire workflow) from a durable workflow and one of your processes will pick it up for execution.
116+
DBOS manages the execution of your tasks: it guarantees that tasks complete, and that their callers get their results without needing to resubmit them, even if your application is interrupted.
117+
118+
Queues also provide flow control, so you can limit the concurrency of your tasks on a per-queue or per-process basis.
119+
You can also set timeouts for tasks, rate limit how often queued tasks are executed, deduplicate tasks, or prioritize tasks.
120+
121+
You can add queues to your workflows in just a couple lines of code.
122+
They don't require a separate queueing service or message broker&mdash;just Postgres.
123+
124+
```java
125+
126+
127+
public void queuedTasks() {
128+
Queue q = new DBOS.QueueBuilder("childQ").build();
129+
130+
for (int i = 0; i < 3; i++) {
131+
132+
String wid = "child" + i;
133+
DBOSOptions options = new DBOSOptions.Builder(wid).queue(q).build();
134+
try (SetDBOSOptions o = new SetDBOSOptions(options)) {
135+
simpleService.childWorkflow(wid);
136+
}
137+
}
138+
139+
for (int i = 0 ; i < 3 ; i++) {
140+
String wid = "child"+i;
141+
WorkflowHandle h = DBOS.retrieveWorkflow(wid);
142+
System.out.println(h.getResult());
143+
}
144+
}
145+
```
146+
147+
[Read more ↗️](https://docs.dbos.dev/python/tutorials/queue-tutorial)
148+
149+
</details>
150+
151+
152+
## Getting Started
153+
154+
To get started, follow the [quickstart](https://docs.dbos.dev/quickstart) to install this open-source library and connect it to a Postgres database.
155+
Then, check out the [programming guide](https://docs.dbos.dev/python/programming-guide) to learn how to build with durable workflows and queues.
156+
157+
## Documentation
158+
159+
[https://docs.dbos.dev](https://docs.dbos.dev)
160+
161+
## Examples
162+
163+
[https://docs.dbos.dev/examples](https://docs.dbos.dev/examples)
164+
165+
## DBOS vs. Other Systems
166+
167+
<details><summary><strong>DBOS vs. Temporal</strong></summary>
168+
169+
####
170+
171+
Both DBOS and Temporal provide durable execution, but DBOS is implemented in a lightweight Postgres-backed library whereas Temporal is implemented in an externally orchestrated server.
172+
173+
You can add DBOS to your program by installing this open-source library, connecting it to Postgres, and annotating workflows and steps.
174+
By contrast, to add Temporal to your program, you must rearchitect your program to move your workflows and steps (activities) to a Temporal worker, configure a Temporal server to orchestrate those workflows, and access your workflows only through a Temporal client.
175+
[This blog post](https://www.dbos.dev/blog/durable-execution-coding-comparison) makes the comparison in more detail.
176+
177+
**When to use DBOS:** You need to add durable workflows to your applications with minimal rearchitecting, or you are using Postgres.
178+
179+
**When to use Temporal:** You don't want to add Postgres to your stack, or you need a language DBOS doesn't support yet.
180+
181+
</details>
182+
183+
<details><summary><strong>DBOS vs. Airflow</strong></summary>
184+
185+
####
186+
187+
DBOS and Airflow both provide workflow abstractions.
188+
Airflow is targeted at data science use cases, providing many out-of-the-box connectors but requiring workflows be written as explicit DAGs and externally orchestrating them from an Airflow cluster.
189+
Airflow is designed for batch operations and does not provide good performance for streaming or real-time use cases.
190+
DBOS is general-purpose, but is often used for data pipelines, allowing developers to write workflows as code and requiring no infrastructure except Postgres.
191+
192+
**When to use DBOS:** You need the flexibility of writing workflows as code, or you need higher performance than Airflow is capable of (particularly for streaming or real-time use cases).
193+
194+
**When to use Airflow:** You need Airflow's ecosystem of connectors.
195+
196+
</details>
197+
198+
<details><summary><strong>DBOS vs. Celery/BullMQ</strong></summary>
199+
200+
####
201+
202+
DBOS provides a similar queue abstraction to dedicated queueing systems like Celery or BullMQ: you can declare queues, submit tasks to them, and control their flow with concurrency limits, rate limits, timeouts, prioritization, etc.
203+
However, DBOS queues are **durable and Postgres-backed** and integrate with durable workflows.
204+
For example, in DBOS you can write a durable workflow that enqueues a thousand tasks and waits for their results.
205+
DBOS checkpoints the workflow and each of its tasks in Postgres, guaranteeing that even if failures or interruptions occur, the tasks will complete and the workflow will collect their results.
206+
By contrast, Celery/BullMQ are Redis-backed and don't provide workflows, so they provide fewer guarantees but better performance.
207+
208+
**When to use DBOS:** You need the reliability of enqueueing tasks from durable workflows.
209+
210+
**When to use Celery/BullMQ**: You don't need durability, or you need very high throughput beyond what your Postgres server can support.
211+
</details>
212+
213+
## Community
214+
215+
If you want to ask questions or hang out with the community, join us on [Discord](https://discord.gg/fMwQjeW5zg)!
216+
If you see a bug or have a feature request, don't hesitate to open an issue here on GitHub.
217+
If you're interested in contributing, check out our [contributions guide](./CONTRIBUTING.md).
34218

35-
implementation("dev.dbos:transact:1.0-SNAPSHOT")
36-
implementation("ch.qos.logback:logback-classic:1.5.6")
37219

38-
Annotations @Workflow, @Transaction, @Step need to be on implementation class methods.
39220

40221

41222

src/main/java/dev/dbos/transact/DBOS.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,5 +326,19 @@ public void setEvent(String key, Object value) {
326326
public Object getEvent(String workflowId, String key, float timeOut) {
327327
return notificationService.getEvent(workflowId, key, timeOut) ;
328328
}
329+
330+
/**
331+
*
332+
* Durable sleep. When you are in a workflow, use this instead of Thread.sleep.
333+
* On restart or during recovery the original expected wakeup time is honoured as
334+
* opposed to sleeping all over again.
335+
*
336+
* @param seconds in seconds
337+
*/
338+
339+
public void sleep(float seconds) {
340+
341+
this.dbosExecutor.sleep(seconds) ;
342+
}
329343
}
330344

src/main/java/dev/dbos/transact/database/NotificationsDAO.java

Lines changed: 3 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ public Object recv(String workflowUuid, int functionId, int timeoutFunctionId,
160160
if (!hasExistingNotification) {
161161
// Wait for the notification
162162
// Support OAOO sleep
163-
double actualTimeout = sleep(workflowUuid, timeoutFunctionId, timeoutSeconds, true);
163+
double actualTimeout = stepsDAO.sleep(workflowUuid, timeoutFunctionId, timeoutSeconds, true);
164164
long timeoutMs = (long) (actualTimeout * 1000);
165165
lockPair.condition.await(timeoutMs, TimeUnit.MILLISECONDS);
166166

@@ -226,61 +226,7 @@ public Object recv(String workflowUuid, int functionId, int timeoutFunctionId,
226226
}
227227
}
228228
}
229-
230-
// TODO : can be moved elsewhere when we implement DBOS.sleep
231-
public double sleep(String workflowUuid, int functionId, double seconds, boolean skipSleep) throws SQLException {
232-
String functionName = "DBOS.sleep";
233-
234-
StepResult recordedOutput = null ;
235-
236-
try (Connection c = dataSource.getConnection()) {
237-
recordedOutput = stepsDAO.checkStepExecutionTxn(workflowUuid, functionId, functionName, c) ;
238-
}
239-
240-
Double endTime;
241-
if (recordedOutput != null) {
242-
logger.debug("Replaying sleep, id: {}, seconds: {}", functionId, seconds);
243-
String output = recordedOutput.getOutput();
244-
if (output == null) {
245-
throw new AssertionError("no recorded end time");
246-
}
247-
endTime = (Double)JSONUtil.deserializeToArray(output)[0] ;
248-
} else {
249-
logger.debug("Running sleep, id: {}, seconds: {}", functionId, seconds);
250-
endTime = System.currentTimeMillis() / 1000.0 + seconds;
251-
252-
try {
253-
254-
StepResult output = new StepResult();
255-
output.setWorkflowId(workflowUuid);
256-
output.setFunctionId(functionId);
257-
output.setFunctionName(functionName);
258-
output.setOutput(JSONUtil.serialize(endTime));
259-
output.setError(null);
260-
261-
try(Connection conn = dataSource.getConnection()) {
262-
stepsDAO.recordStepResultTxn(output, conn);
263-
}
264-
} catch (DBOSWorkflowConflictException e) {
265-
// Ignore conflict - operation already recorded
266-
logger.error("Error recording sleep error ",e);
267-
}
268-
}
269-
270-
double duration = Math.max(0, endTime - (System.currentTimeMillis() / 1000.0));
271-
272-
if (!skipSleep) {
273-
try {
274-
Thread.sleep((long) (duration * 1000));
275-
} catch (InterruptedException e) {
276-
Thread.currentThread().interrupt();
277-
throw new RuntimeException("Sleep interrupted", e);
278-
}
279-
}
280-
281-
return duration;
282-
}
283-
229+
284230
public void setEvent(String workflowId, int functionId, String key, Object message) throws SQLException {
285231
String functionName = "DBOS.setEvent";
286232

@@ -403,7 +349,7 @@ public Object getEvent(String targetUuid, String key, double timeoutSeconds, Get
403349
double actualTimeout = timeoutSeconds;
404350
if (callerCtx != null) {
405351
// Support OAOO sleep for workflows
406-
actualTimeout = sleep(
352+
actualTimeout = stepsDAO.sleep(
407353
callerCtx.getWorkflowId(),
408354
callerCtx.getTimeoutFunctionId(),
409355
timeoutSeconds,

0 commit comments

Comments
 (0)