Skip to content

Commit 1b412ed

Browse files
authored
Stream support (#342)
adds `readStream`, `writeStream`, `closeStream` APIs for DBOS (and `readStream` for DBOSClient) with associated DBOSExecutor and SystemDatabase changes. Plus tests fixes #215
1 parent e034d97 commit 1b412ed

14 files changed

Lines changed: 1130 additions & 198 deletions

File tree

transact/src/main/java/dev/dbos/transact/DBOS.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.time.ZoneId;
3636
import java.util.Collection;
3737
import java.util.HashSet;
38+
import java.util.Iterator;
3839
import java.util.List;
3940
import java.util.Objects;
4041
import java.util.Optional;
@@ -1107,4 +1108,49 @@ public static boolean inStep() {
11071108
public static @Nullable SerializationStrategy serializationStrategy() {
11081109
return DBOSContext.serializationStrategy();
11091110
}
1111+
1112+
/**
1113+
* Write a value to a stream. Must be called from within a workflow or step.
1114+
*
1115+
* @param key The stream key / name within the workflow
1116+
* @param value A serializable value to write to the stream
1117+
*/
1118+
public void writeStream(@NonNull String key, @NonNull Object value) {
1119+
writeStream(key, value, null);
1120+
}
1121+
1122+
/**
1123+
* Write a value to a stream with a specific serialization strategy. Must be called from within a
1124+
* workflow or step.
1125+
*
1126+
* @param key The stream key / name within the workflow
1127+
* @param value A serializable value to write to the stream
1128+
* @param serialization The serialization strategy to use (null for workflow default)
1129+
*/
1130+
public void writeStream(
1131+
@NonNull String key, @NonNull Object value, @Nullable SerializationStrategy serialization) {
1132+
ensureLaunched("writeStream").writeStream(key, value, serialization);
1133+
}
1134+
1135+
/**
1136+
* Close a stream. Must be called from within a workflow, not a step.
1137+
*
1138+
* @param key The stream key / name within the workflow
1139+
*/
1140+
public void closeStream(@NonNull String key) {
1141+
ensureLaunched("closeStream").closeStream(key);
1142+
}
1143+
1144+
/**
1145+
* Read values from a stream as an iterator. This function reads values from a stream identified
1146+
* by the workflow_id and key, returning an iterator that yields each value in order until the
1147+
* stream is closed or the workflow terminates.
1148+
*
1149+
* @param workflowId The workflow instance ID that owns the stream
1150+
* @param key The stream key / name within the workflow
1151+
* @return Iterator that yields each value in the stream
1152+
*/
1153+
public @NonNull Iterator<Object> readStream(@NonNull String workflowId, @NonNull String key) {
1154+
return ensureLaunched("readStream").readStream(workflowId, key);
1155+
}
11101156
}

transact/src/main/java/dev/dbos/transact/DBOSClient.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package dev.dbos.transact;
22

33
import dev.dbos.transact.database.Result;
4+
import dev.dbos.transact.database.StreamIterator;
45
import dev.dbos.transact.database.SystemDatabase;
56
import dev.dbos.transact.execution.DBOSExecutor;
67
import dev.dbos.transact.json.DBOSSerializer;
@@ -20,6 +21,7 @@
2021
import java.time.Duration;
2122
import java.time.Instant;
2223
import java.time.ZoneId;
24+
import java.util.Iterator;
2325
import java.util.List;
2426
import java.util.Map;
2527
import java.util.Objects;
@@ -642,6 +644,19 @@ public void send(
642644
return Optional.ofNullable(systemDatabase.getEvent(targetId, key, timeout, null));
643645
}
644646

647+
/**
648+
* Read values from a stream as an iterator. This function reads values from a stream identified
649+
* by the workflow_id and key, returning an iterator that yields each value in order until the
650+
* stream is closed or the workflow terminates.
651+
*
652+
* @param workflowId The workflow instance ID that owns the stream
653+
* @param key The stream key / name within the workflow
654+
* @return Iterator that yields each value in the stream
655+
*/
656+
public @NonNull Iterator<Object> readStream(@NonNull String workflowId, @NonNull String key) {
657+
return new StreamIterator(workflowId, key, systemDatabase);
658+
}
659+
645660
/**
646661
* Create a handle for a workflow. This call does not ensure that the workflow exists; use the
647662
* returned handle's `getStatus()`.
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package dev.dbos.transact.database;
2+
3+
import dev.dbos.transact.workflow.WorkflowState;
4+
import dev.dbos.transact.workflow.WorkflowStatus;
5+
6+
import java.util.Iterator;
7+
import java.util.NoSuchElementException;
8+
9+
public class StreamIterator implements Iterator<Object> {
10+
private final String workflowId;
11+
private final String key;
12+
private final SystemDatabase systemDatabase;
13+
private int offset = 0;
14+
private Object nextValue = null;
15+
private boolean finished = false;
16+
17+
public StreamIterator(String workflowId, String key, SystemDatabase systemDatabase) {
18+
this.workflowId = workflowId;
19+
this.key = key;
20+
this.systemDatabase = systemDatabase;
21+
advance();
22+
}
23+
24+
private void advance() {
25+
while (!finished) {
26+
try {
27+
Object value = systemDatabase.readStream(workflowId, key, offset);
28+
nextValue = value;
29+
offset++;
30+
return;
31+
} catch (IllegalArgumentException e) {
32+
WorkflowStatus status = systemDatabase.getWorkflowStatus(workflowId);
33+
if (status == null || !isWorkflowActive(status.status())) {
34+
finished = true;
35+
nextValue = null;
36+
return;
37+
}
38+
try {
39+
Thread.sleep(1000);
40+
} catch (InterruptedException ie) {
41+
Thread.currentThread().interrupt();
42+
finished = true;
43+
nextValue = null;
44+
return;
45+
}
46+
} catch (IllegalStateException e) {
47+
finished = true;
48+
nextValue = null;
49+
return;
50+
}
51+
}
52+
}
53+
54+
private boolean isWorkflowActive(WorkflowState state) {
55+
return WorkflowState.PENDING == state || WorkflowState.ENQUEUED == state;
56+
}
57+
58+
@Override
59+
public boolean hasNext() {
60+
return nextValue != null;
61+
}
62+
63+
@Override
64+
public Object next() {
65+
if (nextValue == null) {
66+
throw new NoSuchElementException();
67+
}
68+
Object result = nextValue;
69+
advance();
70+
return result;
71+
}
72+
}
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
package dev.dbos.transact.database;
2+
3+
import dev.dbos.transact.json.SerializationUtil;
4+
import dev.dbos.transact.workflow.internal.StepResult;
5+
6+
import java.sql.Connection;
7+
import java.sql.SQLException;
8+
import java.util.ArrayList;
9+
import java.util.LinkedHashMap;
10+
import java.util.List;
11+
import java.util.Map;
12+
13+
import javax.sql.DataSource;
14+
15+
class StreamsDAO {
16+
17+
private final DataSource dataSource;
18+
private final String schema;
19+
20+
StreamsDAO(DataSource dataSource, String schema) {
21+
this.dataSource = dataSource;
22+
this.schema = schema;
23+
}
24+
25+
public void writeStreamFromStep(
26+
String workflowId, int functionId, String key, Object value, String serializationFormat)
27+
throws SQLException {
28+
try (Connection conn = dataSource.getConnection()) {
29+
insertStream(conn, workflowId, functionId, key, value, serializationFormat);
30+
}
31+
}
32+
33+
public void writeStreamFromWorkflow(
34+
String workflowId, int functionId, String key, Object value, String serializationFormat)
35+
throws SQLException {
36+
String functionName =
37+
STREAM_CLOSED_SENTINEL.equals(value) ? "DBOS.closeStream" : "DBOS.writeStream";
38+
long startTime = System.currentTimeMillis();
39+
40+
try (Connection conn = dataSource.getConnection()) {
41+
conn.setAutoCommit(false);
42+
43+
try {
44+
StepResult recordedOutput =
45+
StepsDAO.checkStepExecutionTxn(workflowId, functionId, functionName, conn, schema);
46+
47+
if (recordedOutput != null) {
48+
logger.debug("Replaying writeStream, id: {}, key: {}", functionId, key);
49+
conn.commit();
50+
return;
51+
} else {
52+
logger.debug("Running writeStream, id: {}, key: {}", functionId, key);
53+
}
54+
55+
insertStream(conn, workflowId, functionId, key, value, serializationFormat);
56+
57+
var output = new StepResult(workflowId, functionId, functionName, null, null, null, null);
58+
StepsDAO.recordStepResultTxn(output, startTime, System.currentTimeMillis(), conn, schema);
59+
60+
conn.commit();
61+
62+
} catch (Exception e) {
63+
try {
64+
conn.rollback();
65+
} catch (SQLException rollbackEx) {
66+
e.addSuppressed(rollbackEx);
67+
}
68+
throw e;
69+
}
70+
}
71+
}
72+
73+
private void insertStream(
74+
Connection conn,
75+
String workflowId,
76+
int functionId,
77+
String key,
78+
Object value,
79+
String serializationFormat)
80+
throws SQLException {
81+
var serialized = SerializationUtil.serializeValue(value, serializationFormat, null);
82+
int offset = getNextOffsetTx(conn, workflowId, key);
83+
84+
String sql =
85+
"""
86+
INSERT INTO "%s".streams (workflow_uuid, key, value, "offset", function_id, serialization)
87+
VALUES (?, ?, ?, ?, ?, ?)
88+
"""
89+
.formatted(schema);
90+
91+
try (var stmt = conn.prepareStatement(sql)) {
92+
stmt.setString(1, workflowId);
93+
stmt.setString(2, key);
94+
stmt.setString(3, serialized.serializedValue());
95+
stmt.setInt(4, offset);
96+
stmt.setInt(5, functionId);
97+
stmt.setString(6, serialized.serialization());
98+
stmt.executeUpdate();
99+
}
100+
}
101+
102+
private int getNextOffsetTx(Connection conn, String workflowId, String key) throws SQLException {
103+
String sql =
104+
"""
105+
SELECT COALESCE(MAX("offset"), -1) + 1
106+
FROM "%s".streams
107+
WHERE workflow_uuid = ? AND key = ?
108+
"""
109+
.formatted(schema);
110+
111+
try (var stmt = conn.prepareStatement(sql)) {
112+
stmt.setString(1, workflowId);
113+
stmt.setString(2, key);
114+
try (var rs = stmt.executeQuery()) {
115+
if (rs.next()) {
116+
return rs.getInt(1);
117+
}
118+
return 0;
119+
}
120+
}
121+
}
122+
123+
public void closeStream(String workflowId, int functionId, String key) throws SQLException {
124+
writeStreamFromWorkflow(workflowId, functionId, key, STREAM_CLOSED_SENTINEL, "portable_json");
125+
}
126+
127+
public Object readStream(String workflowId, String key, int offset) throws SQLException {
128+
String sql =
129+
"""
130+
SELECT value, serialization
131+
FROM "%s".streams
132+
WHERE workflow_uuid = ? AND key = ? AND "offset" = ?
133+
"""
134+
.formatted(schema);
135+
136+
try (Connection conn = dataSource.getConnection();
137+
var stmt = conn.prepareStatement(sql)) {
138+
stmt.setString(1, workflowId);
139+
stmt.setString(2, key);
140+
stmt.setInt(3, offset);
141+
try (var rs = stmt.executeQuery()) {
142+
if (rs.next()) {
143+
String value = rs.getString("value");
144+
String serialization = rs.getString("serialization");
145+
Object deserialized = SerializationUtil.deserializeValue(value, serialization, null);
146+
if (STREAM_CLOSED_SENTINEL.equals(deserialized)) {
147+
throw new IllegalStateException("Stream closed for key: " + key);
148+
}
149+
return deserialized;
150+
}
151+
throw new IllegalArgumentException(
152+
"No value found for workflow=" + workflowId + ", key=" + key + ", offset=" + offset);
153+
}
154+
}
155+
}
156+
157+
public Map<String, List<Object>> getAllStreamEntries(String workflowId) throws SQLException {
158+
String sql =
159+
"""
160+
SELECT key, value, serialization
161+
FROM "%s".streams
162+
WHERE workflow_uuid = ?
163+
ORDER BY key, "offset"
164+
"""
165+
.formatted(schema);
166+
167+
var streams = new LinkedHashMap<String, List<Object>>();
168+
try (Connection conn = dataSource.getConnection();
169+
var stmt = conn.prepareStatement(sql)) {
170+
stmt.setString(1, workflowId);
171+
try (var rs = stmt.executeQuery()) {
172+
while (rs.next()) {
173+
String key = rs.getString("key");
174+
String value = rs.getString("value");
175+
String serialization = rs.getString("serialization");
176+
177+
Object deserialized = SerializationUtil.deserializeValue(value, serialization, null);
178+
if (STREAM_CLOSED_SENTINEL.equals(deserialized)) {
179+
continue;
180+
}
181+
182+
streams.computeIfAbsent(key, k -> new ArrayList<>()).add(deserialized);
183+
}
184+
}
185+
}
186+
return streams;
187+
}
188+
189+
private static final org.slf4j.Logger logger =
190+
org.slf4j.LoggerFactory.getLogger(StreamsDAO.class);
191+
192+
static final String STREAM_CLOSED_SENTINEL = "__DBOS_STREAM_CLOSED__";
193+
}

0 commit comments

Comments
 (0)