Skip to content

Commit 08e8bf4

Browse files
authored
Update forkWorkflow (#216)
* populate `workflow_events_history` in `setEvent` * rework `setEvent` for wf vs step usage * copy `workflow_events_history` and `workflow_events` in `forkWorkflow` * don't copy `application_version` in `forkWorkflow` fixes #211
1 parent 728d246 commit 08e8bf4

10 files changed

Lines changed: 239 additions & 60 deletions

File tree

transact/src/main/java/dev/dbos/transact/context/DBOSContext.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,10 @@ public Integer getStepId() {
8484
return stepFunctionId;
8585
}
8686

87+
public int getCurrentFunctionId() {
88+
return functionId;
89+
}
90+
8791
public int getAndIncrementFunctionId() {
8892
return functionId++;
8993
}

transact/src/main/java/dev/dbos/transact/database/NotificationsDAO.java

Lines changed: 53 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -237,70 +237,86 @@ WITH oldest_entry AS (
237237
}
238238
}
239239

240-
public void setEvent(String workflowId, Integer functionId, String key, Object message)
240+
private void setEvent(
241+
Connection conn, String workflowId, int functionId, String key, String message)
242+
throws SQLException {
243+
final String eventSql =
244+
"""
245+
INSERT INTO %s.workflow_events (workflow_uuid, key, value)
246+
VALUES (?, ?, ?)
247+
ON CONFLICT (workflow_uuid, key)
248+
DO UPDATE SET value = EXCLUDED.value
249+
"""
250+
.formatted(this.schema);
251+
252+
try (PreparedStatement stmt = conn.prepareStatement(eventSql)) {
253+
stmt.setString(1, workflowId);
254+
stmt.setString(2, key);
255+
stmt.setString(3, message);
256+
stmt.executeUpdate();
257+
}
258+
259+
final String eventHistorySql =
260+
"""
261+
INSERT INTO %s.workflow_events_history (workflow_uuid, function_id, key, value)
262+
VALUES (?, ?, ?, ?)
263+
ON CONFLICT (workflow_uuid, key, function_id)
264+
DO UPDATE SET value = EXCLUDED.value
265+
"""
266+
.formatted(this.schema);
267+
268+
try (PreparedStatement stmt = conn.prepareStatement(eventHistorySql)) {
269+
stmt.setString(1, workflowId);
270+
stmt.setInt(2, functionId);
271+
stmt.setString(3, key);
272+
stmt.setString(4, message);
273+
stmt.executeUpdate();
274+
}
275+
}
276+
277+
public void setEvent(
278+
String workflowId, int functionId, String key, Object message, boolean asStep)
241279
throws SQLException {
242280
if (dataSource.isClosed()) {
243281
throw new IllegalStateException("Database is closed!");
244282
}
245283

246284
var startTime = System.currentTimeMillis();
247285
String functionName = "DBOS.setEvent";
286+
String serializedMessage = JSONUtil.serialize(message);
248287

249288
try (Connection conn = dataSource.getConnection()) {
250289
conn.setAutoCommit(false);
251-
conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
252-
253290
try {
254-
// Check if operation was already executed
255-
if (functionId != null) {
256-
StepResult recordedOutput =
291+
if (asStep) {
292+
// check for a previous operation result
293+
var recordedOutput =
257294
StepsDAO.checkStepExecutionTxn(
258295
workflowId, functionId, functionName, conn, this.schema);
259-
260296
if (recordedOutput != null) {
261-
logger.debug("Replaying setEvent, id: {}, key: {}", functionId, key);
297+
logger.debug(
298+
"Replaying setEvent, workflow: {}, step: {}, key: {}", workflowId, functionId, key);
262299
conn.commit();
263300
return; // Already sent before
264301
} else {
265-
logger.debug("Running setEvent, id: {}, key: {}", functionId, key);
302+
logger.debug(
303+
"Running setEvent, workflow: {}, step: {}, key: {}", workflowId, functionId, key);
266304
}
267305
}
268306

269-
// Serialize the message
270-
String serializedMessage = JSONUtil.serialize(message);
271-
272-
// Insert or update the workflow event using UPSERT
273-
final String sql =
274-
"""
275-
INSERT INTO %s.workflow_events (workflow_uuid, key, value)
276-
VALUES (?, ?, ?)
277-
ON CONFLICT (workflow_uuid, key)
278-
DO UPDATE SET value = EXCLUDED.value
279-
"""
280-
.formatted(this.schema);
281-
282-
try (PreparedStatement stmt = conn.prepareStatement(sql)) {
283-
stmt.setString(1, workflowId);
284-
stmt.setString(2, key);
285-
stmt.setString(3, serializedMessage);
286-
stmt.executeUpdate();
287-
}
288-
289-
if (functionId != null) {
290-
// Create operation result
291-
StepResult output = new StepResult(workflowId, functionId, functionName);
307+
this.setEvent(conn, workflowId, functionId, key, serializedMessage);
292308

309+
if (asStep) {
293310
// Record the operation result
311+
StepResult output = new StepResult(workflowId, functionId, functionName);
294312
StepsDAO.recordStepResultTxn(output, startTime, conn, this.schema);
295313
}
296314

297315
conn.commit();
298316
} catch (Exception e) {
299-
logger.debug("setEvent rollback, wf: {} id: {}, key: {}", workflowId, functionId, key);
300-
try {
301-
conn.rollback();
302-
} catch (Exception e2) {
303-
}
317+
logger.error(
318+
"setEvent rollback, workflow: {} id: {}, key: {}", workflowId, functionId, key, e);
319+
conn.rollback();
304320
throw e;
305321
}
306322
}

transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -241,11 +241,12 @@ public Object recv(
241241
});
242242
}
243243

244-
public void setEvent(String workflowId, Integer functionId, String key, Object message) {
244+
public void setEvent(
245+
String workflowId, int functionId, String key, Object message, boolean asStep) {
245246

246247
DbRetry.run(
247248
() -> {
248-
notificationsDAO.setEvent(workflowId, functionId, key, message);
249+
notificationsDAO.setEvent(workflowId, functionId, key, message, asStep);
249250
});
250251
}
251252

transact/src/main/java/dev/dbos/transact/database/WorkflowDAO.java

Lines changed: 48 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -854,12 +854,7 @@ private static void insertForkedWorkflowStatus(
854854
stmt.setString(3, originalStatus.name());
855855
stmt.setString(4, originalStatus.className());
856856
stmt.setString(5, originalStatus.instanceName());
857-
858-
// Use provided application version or fall back to original
859-
String appVersion =
860-
applicationVersion != null ? applicationVersion : originalStatus.appVersion();
861-
stmt.setString(6, appVersion);
862-
857+
stmt.setString(6, applicationVersion);
863858
stmt.setString(7, originalStatus.appId());
864859
stmt.setString(8, originalStatus.authenticatedUser());
865860
stmt.setString(9, JSONUtil.serializeArray(originalStatus.authenticatedRoles()));
@@ -882,7 +877,7 @@ private static void copyOperationOutputs(
882877
String schema)
883878
throws SQLException {
884879

885-
String sql =
880+
String stepOutputsSql =
886881
"""
887882
INSERT INTO %1$s.operation_outputs
888883
(workflow_uuid, function_id, output, error, function_name, child_workflow_id, started_at_epoch_ms, completed_at_epoch_ms)
@@ -891,15 +886,59 @@ private static void copyOperationOutputs(
891886
WHERE workflow_uuid = ? AND function_id < ?
892887
"""
893888
.formatted(schema);
894-
895-
try (PreparedStatement stmt = connection.prepareStatement(sql)) {
889+
try (PreparedStatement stmt = connection.prepareStatement(stepOutputsSql)) {
896890
stmt.setString(1, forkedWorkflowId);
897891
stmt.setString(2, originalWorkflowId);
898892
stmt.setInt(3, startStep);
899893

900894
int rowsCopied = stmt.executeUpdate();
901895
logger.debug("Copied " + rowsCopied + " operation outputs to forked workflow");
902896
}
897+
898+
var eventHistorySql =
899+
"""
900+
INSERT INTO %1$s.workflow_events_history
901+
(workflow_uuid, function_id, key, value)
902+
SELECT ? as workflow_uuid, function_id, key, value
903+
FROM %1$s.workflow_events_history
904+
WHERE workflow_uuid = ? AND function_id < ?
905+
"""
906+
.formatted(schema);
907+
try (PreparedStatement stmt = connection.prepareStatement(eventHistorySql)) {
908+
stmt.setString(1, forkedWorkflowId);
909+
stmt.setString(2, originalWorkflowId);
910+
stmt.setInt(3, startStep);
911+
912+
int rowsCopied = stmt.executeUpdate();
913+
logger.debug("Copied " + rowsCopied + " workflow_events_history to forked workflow");
914+
}
915+
916+
var eventSql =
917+
"""
918+
INSERT INTO %1$s.workflow_events
919+
(workflow_uuid, key, value)
920+
SELECT ?, weh1.key, weh1.value
921+
FROM %1$s.workflow_events_history weh1
922+
WHERE weh1.workflow_uuid = ?
923+
AND weh1.function_id = (
924+
SELECT MAX(weh2.function_id)
925+
FROM %1$s.workflow_events_history weh2
926+
WHERE weh2.workflow_uuid = ?
927+
AND weh2.key = weh1.key
928+
AND weh2.function_id < ?
929+
)
930+
"""
931+
.formatted(schema);
932+
933+
try (PreparedStatement stmt = connection.prepareStatement(eventSql)) {
934+
stmt.setString(1, forkedWorkflowId);
935+
stmt.setString(2, originalWorkflowId);
936+
stmt.setString(3, originalWorkflowId);
937+
stmt.setInt(4, startStep);
938+
939+
int rowsCopied = stmt.executeUpdate();
940+
logger.debug("Copied " + rowsCopied + " workflow_events to forked workflow");
941+
}
903942
}
904943

905944
/*

transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -666,12 +666,10 @@ public void setEvent(String key, Object value) {
666666
if (!ctx.isInWorkflow()) {
667667
throw new IllegalStateException("DBOS.setEvent() must be called from a workflow.");
668668
}
669-
if (!ctx.isInStep()) {
670-
int stepFunctionId = ctx.getAndIncrementFunctionId();
671-
systemDatabase.setEvent(ctx.getWorkflowId(), stepFunctionId, key, value);
672-
} else {
673-
systemDatabase.setEvent(ctx.getWorkflowId(), null, key, value);
674-
}
669+
670+
var asStep = !ctx.isInStep();
671+
var stepId = ctx.isInStep() ? ctx.getCurrentFunctionId() : ctx.getAndIncrementFunctionId();
672+
systemDatabase.setEvent(ctx.getWorkflowId(), stepId, key, value, asStep);
675673
}
676674

677675
public Object getEvent(String workflowId, String key, Duration timeout) {

transact/src/test/java/dev/dbos/transact/notifications/EventsTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,14 @@ public void basic_set_get() throws Exception {
6868
eventService.setEventWorkflow("key1", "value1");
6969
}
7070

71+
var steps = DBOS.listWorkflowSteps("id1");
72+
String[] stepNames = {"DBOS.setEvent", "stepSetEvent", "getEventInStep"};
73+
assertEquals(3, steps.size());
74+
for (var i = 0; i < steps.size(); i++) {
75+
var step = steps.get(i);
76+
assertEquals(stepNames[i], step.functionName());
77+
}
78+
7179
try (var id = new WorkflowOptions("id2").setContext()) {
7280
Object event = eventService.getEventWorkflow("id1", "key1", Duration.ofSeconds(3));
7381
assertEquals("value1", (String) event);
@@ -160,6 +168,12 @@ public void set_twice() throws Exception {
160168
getwfh = DBOSTestAccess.getDbosExecutor().executeWorkflowById(getwfh.workflowId());
161169
res = (String) getwfh.getResult();
162170
assertEquals("value1value2", res);
171+
172+
var events = DBUtils.getWorkflowEvents(dataSource, "id1");
173+
assertEquals(1, events.size());
174+
175+
var eventHistory = DBUtils.getWorkflowEventHistory(dataSource, "id1");
176+
assertEquals(2, eventHistory.size());
163177
}
164178

165179
@Test

transact/src/test/java/dev/dbos/transact/utils/DBUtils.java

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -276,22 +276,56 @@ public static List<OperationOutputRow> getStepRows(
276276
}
277277
}
278278

279-
public static List<String> getWorkflowEvents(DataSource ds, String workflowId) {
279+
public record Event(String key, String value) {}
280+
281+
public static List<Event> getWorkflowEvents(DataSource ds, String workflowId) {
280282
return getWorkflowEvents(ds, workflowId, null);
281283
}
282284

283-
public static List<String> getWorkflowEvents(DataSource ds, String workflowId, String schema) {
285+
public static List<Event> getWorkflowEvents(DataSource ds, String workflowId, String schema) {
284286
schema = SystemDatabase.sanitizeSchema(schema);
285287
try (var conn = ds.getConnection(); ) {
286288
var stmt =
287289
conn.prepareStatement(
288290
"SELECT * FROM %s.workflow_events WHERE workflow_uuid = ?".formatted(schema));
289291
stmt.setString(1, workflowId);
290292
var rs = stmt.executeQuery();
291-
List<String> rows = new ArrayList<>();
293+
List<Event> rows = new ArrayList<>();
294+
295+
while (rs.next()) {
296+
var key = rs.getString("key");
297+
var value = rs.getString("value");
298+
rows.add(new Event(key, value));
299+
}
300+
301+
return rows;
302+
} catch (SQLException e) {
303+
throw new RuntimeException(e);
304+
}
305+
}
306+
307+
public record EventHistory(int stepId, String key, String value) {}
308+
309+
public static List<EventHistory> getWorkflowEventHistory(DataSource ds, String workflowId) {
310+
return getWorkflowEventHistory(ds, workflowId, null);
311+
}
312+
313+
public static List<EventHistory> getWorkflowEventHistory(
314+
DataSource ds, String workflowId, String schema) {
315+
schema = SystemDatabase.sanitizeSchema(schema);
316+
try (var conn = ds.getConnection(); ) {
317+
var stmt =
318+
conn.prepareStatement(
319+
"SELECT * FROM %s.workflow_events_history WHERE workflow_uuid = ?".formatted(schema));
320+
stmt.setString(1, workflowId);
321+
var rs = stmt.executeQuery();
322+
List<EventHistory> rows = new ArrayList<>();
292323

293324
while (rs.next()) {
294-
rows.add(rs.getString("key") + "=" + rs.getString("value"));
325+
var stepId = rs.getInt("function_id");
326+
var key = rs.getString("key");
327+
var value = rs.getString("value");
328+
rows.add(new EventHistory(stepId, key, value));
295329
}
296330

297331
return rows;

transact/src/test/java/dev/dbos/transact/workflow/ForkService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,6 @@ public interface ForkService {
2323
String child2(Float number);
2424

2525
void setForkService(ForkService service);
26+
27+
void setEventWorkflow(String key) throws InterruptedException;
2628
}

transact/src/test/java/dev/dbos/transact/workflow/ForkServiceImpl.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,15 +90,25 @@ public void stepFive(boolean b) {
9090
++step5Count;
9191
}
9292

93-
@Workflow(name = "child1")
93+
@Workflow
9494
public String child1(Integer number) {
9595
++child1Count;
9696
return String.valueOf(number);
9797
}
9898

99-
@Workflow(name = "child2")
99+
@Workflow
100100
public String child2(Float number) {
101101
++child2Count;
102102
return String.valueOf(number);
103103
}
104+
105+
@Workflow
106+
public void setEventWorkflow(String key) throws InterruptedException {
107+
for (int i = 0; i < 5; i++) {
108+
DBOS.setEvent(key, "event-%d".formatted(System.currentTimeMillis()));
109+
if (i < 4) {
110+
Thread.sleep(100);
111+
}
112+
}
113+
}
104114
}

0 commit comments

Comments
 (0)