Skip to content

Commit 728d246

Browse files
authored
Workflow Introspection (#212)
Adds support for Workflow forkedFrom and Step startedAt / completedAt fixes #207
1 parent c5b30ad commit 728d246

20 files changed

Lines changed: 405 additions & 313 deletions

transact/src/main/java/dev/dbos/transact/admin/ListWorkflowsRequest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ public ListWorkflowsInput asInput() {
4040
load_output,
4141
queue_name,
4242
queue_name != null ? true : false,
43-
null // Executor IDs
43+
null, // Executor IDs
44+
null // forkedFrom
4445
);
4546
}
4647
}

transact/src/main/java/dev/dbos/transact/conductor/protocol/ListQueuedWorkflowsRequest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ public static class Body {
1616
public String start_time;
1717
public String end_time;
1818
public String status;
19+
public String forked_from;
1920
public String queue_name;
2021
public Integer limit;
2122
public Integer offset;
@@ -28,6 +29,7 @@ public static class Builder {
2829
private String start_time;
2930
private String end_time;
3031
private String status;
32+
private String forked_from;
3133
private String queue_name;
3234
private Integer limit;
3335
private Integer offset;
@@ -54,6 +56,11 @@ public Builder status(String status) {
5456
return this;
5557
}
5658

59+
public Builder forkedFrom(String forkedFrom) {
60+
this.forked_from = forkedFrom;
61+
return this;
62+
}
63+
5764
public Builder queueName(String queueName) {
5865
this.queue_name = queueName;
5966
return this;
@@ -89,6 +96,7 @@ public ListQueuedWorkflowsRequest build(String requestId) {
8996
body.start_time = this.start_time;
9097
body.end_time = this.end_time;
9198
body.status = this.status;
99+
body.forked_from = this.forked_from;
92100
body.queue_name = this.queue_name;
93101
body.limit = this.limit;
94102
body.offset = this.offset;
@@ -108,6 +116,7 @@ public ListWorkflowsInput asInput() {
108116
.withStartTime(body.start_time != null ? OffsetDateTime.parse(body.start_time) : null)
109117
.withEndTime(body.end_time != null ? OffsetDateTime.parse(body.end_time) : null)
110118
.withStatus(body.status)
119+
.withForkedFrom(body.forked_from)
111120
.withQueueName(body.queue_name)
112121
.withLimit(body.limit)
113122
.withOffset(body.offset)

transact/src/main/java/dev/dbos/transact/conductor/protocol/ListStepsResponse.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ public static class Step {
1515
public String output;
1616
public String error;
1717
public String child_workflow_id;
18+
public String started_at_epoch_ms;
19+
public String completed_at_epoch_ms;
1820

1921
public Step(StepInfo info) {
2022
Object output = info.output();
@@ -26,6 +28,10 @@ public Step(StepInfo info) {
2628
this.error =
2729
error != null ? String.format("%s: %s", error.className(), error.message()) : null;
2830
this.child_workflow_id = info.childWorkflowId();
31+
this.started_at_epoch_ms =
32+
info.startedAtEpochMs() == null ? null : info.startedAtEpochMs().toString();
33+
this.completed_at_epoch_ms =
34+
info.completedAtEpochMs() == null ? null : info.completedAtEpochMs().toString();
2935
}
3036
}
3137

transact/src/main/java/dev/dbos/transact/conductor/protocol/ListWorkflowsRequest.java

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ public static class Body {
1919
public String start_time;
2020
public String end_time;
2121
public String status;
22+
public String forked_from;
2223
public String application_version;
2324
public Integer limit;
2425
public Integer offset;
@@ -34,6 +35,7 @@ public static class Builder {
3435
private String start_time;
3536
private String end_time;
3637
private String status;
38+
private String forked_from;
3739
private String application_version;
3840
private Integer limit;
3941
private Integer offset;
@@ -51,18 +53,6 @@ public Builder workflowId(String workflow_id) {
5153
return this;
5254
}
5355

54-
/* Future
55-
public Builder className(String class_name) {
56-
this.class_name = class_name;
57-
return this;
58-
}
59-
60-
public Builder instanceName(String instance_name) {
61-
this.instance_name = instance_name;
62-
return this;
63-
}
64-
*/
65-
6656
public Builder workflowName(String workflow_name) {
6757
this.workflow_name = workflow_name;
6858
return this;
@@ -88,6 +78,11 @@ public Builder status(String status) {
8878
return this;
8979
}
9080

81+
public Builder forkedFrom(String forked_from) {
82+
this.forked_from = forked_from;
83+
return this;
84+
}
85+
9186
public Builder applicationVersion(String application_version) {
9287
this.application_version = application_version;
9388
return this;
@@ -115,13 +110,12 @@ public ListWorkflowsRequest build(String requestId) {
115110

116111
Body body = new Body();
117112
body.workflow_uuids = this.workflow_uuids;
118-
// body.class_name = this.class_name;
119-
// body.instance_name = this.instance_name;
120113
body.workflow_name = this.workflow_name;
121114
body.authenticated_user = this.authenticated_user;
122115
body.start_time = this.start_time;
123116
body.end_time = this.end_time;
124117
body.status = this.status;
118+
body.forked_from = this.forked_from;
125119
body.application_version = this.application_version;
126120
body.limit = this.limit;
127121
body.offset = this.offset;
@@ -141,6 +135,7 @@ public ListWorkflowsInput asInput() {
141135
.withStartTime(body.start_time != null ? OffsetDateTime.parse(body.start_time) : null)
142136
.withEndTime(body.end_time != null ? OffsetDateTime.parse(body.end_time) : null)
143137
.withStatus(body.status)
138+
.withForkedFrom(body.forked_from)
144139
.withApplicationVersion(body.application_version)
145140
.withLimit(body.limit)
146141
.withOffset(body.offset)

transact/src/main/java/dev/dbos/transact/conductor/protocol/WorkflowsOutput.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import dev.dbos.transact.json.JSONUtil;
44
import dev.dbos.transact.workflow.WorkflowStatus;
55

6+
import java.util.Objects;
7+
68
public class WorkflowsOutput {
79
public String WorkflowUUID;
810

@@ -23,6 +25,12 @@ public class WorkflowsOutput {
2325
public String QueueName;
2426
public String ApplicationVersion;
2527
public String ExecutorID;
28+
public String WorkflowTimeoutMS;
29+
public String WorkflowDeadlineEpochMS;
30+
public String DeduplicationID;
31+
public String Priority;
32+
public String QueuePartitionKey;
33+
public String ForkedFrom;
2634

2735
public WorkflowsOutput(WorkflowStatus status) {
2836
Object[] input = status.input();
@@ -54,5 +62,12 @@ public WorkflowsOutput(WorkflowStatus status) {
5462
this.QueueName = status.queueName();
5563
this.ApplicationVersion = status.appVersion();
5664
this.ExecutorID = status.executorId();
65+
this.WorkflowTimeoutMS = status.timeoutMs() == null ? null : status.timeoutMs().toString();
66+
this.WorkflowDeadlineEpochMS =
67+
status.deadlineEpochMs() == null ? null : status.deadlineEpochMs().toString();
68+
this.DeduplicationID = status.deduplicationId();
69+
this.Priority = Objects.requireNonNullElse(status.priority(), 0).toString();
70+
this.QueuePartitionKey = status.partitionKey();
71+
this.ForkedFrom = status.forkedFrom();
5772
}
5873
}

transact/src/main/java/dev/dbos/transact/database/NotificationsDAO.java

Lines changed: 21 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public void send(
4040
throw new IllegalStateException("Database is closed!");
4141
}
4242

43+
var startTime = System.currentTimeMillis();
4344
String functionName = "DBOS.send";
4445
String finalTopic = (topic != null) ? topic : Constants.DBOS_NULL_TOPIC;
4546

@@ -89,14 +90,8 @@ public void send(
8990
}
9091

9192
// Record operation result
92-
StepResult output = new StepResult();
93-
output.setWorkflowId(workflowUuid);
94-
output.setStepId(functionId);
95-
output.setFunctionName(functionName);
96-
output.setOutput(null);
97-
output.setError(null);
98-
99-
StepsDAO.recordStepResultTxn(output, conn, this.schema);
93+
var output = new StepResult(workflowUuid, functionId, functionName);
94+
StepsDAO.recordStepResultTxn(output, startTime, conn, this.schema);
10095

10196
conn.commit();
10297

@@ -119,6 +114,7 @@ public Object recv(
119114
throw new IllegalStateException("Database is closed!");
120115
}
121116

117+
var startTime = System.currentTimeMillis();
122118
String functionName = "DBOS.recv";
123119
String finalTopic = (topic != null) ? topic : Constants.DBOS_NULL_TOPIC;
124120

@@ -132,8 +128,8 @@ public Object recv(
132128

133129
if (recordedOutput != null) {
134130
logger.debug("Replaying recv, id: {}, topic: {}", functionId, finalTopic);
135-
if (recordedOutput.getOutput() != null) {
136-
Object[] dSerOut = JSONUtil.deserializeToArray(recordedOutput.getOutput());
131+
if (recordedOutput.output() != null) {
132+
Object[] dSerOut = JSONUtil.deserializeToArray(recordedOutput.output());
137133
return dSerOut == null ? null : dSerOut[0];
138134
} else {
139135
throw new RuntimeException("No output recorded in the last recv");
@@ -225,15 +221,11 @@ WITH oldest_entry AS (
225221
}
226222

227223
// Record operation result
228-
StepResult output = new StepResult();
229-
output.setWorkflowId(workflowUuid);
230-
output.setStepId(functionId);
231-
output.setFunctionName(functionName);
232224
Object toSave = recvdSermessage == null ? null : recvdSermessage[0];
233-
output.setOutput(JSONUtil.serialize(toSave));
234-
output.setError(null);
235-
236-
StepsDAO.recordStepResultTxn(output, conn, this.schema);
225+
StepResult output =
226+
new StepResult(workflowUuid, functionId, functionName)
227+
.withOutput(JSONUtil.serialize(toSave));
228+
StepsDAO.recordStepResultTxn(output, startTime, conn, this.schema);
237229

238230
conn.commit();
239231
return toSave;
@@ -251,6 +243,7 @@ public void setEvent(String workflowId, Integer functionId, String key, Object m
251243
throw new IllegalStateException("Database is closed!");
252244
}
253245

246+
var startTime = System.currentTimeMillis();
254247
String functionName = "DBOS.setEvent";
255248

256249
try (Connection conn = dataSource.getConnection()) {
@@ -295,15 +288,10 @@ ON CONFLICT (workflow_uuid, key)
295288

296289
if (functionId != null) {
297290
// Create operation result
298-
StepResult output = new StepResult();
299-
output.setWorkflowId(workflowId);
300-
output.setStepId(functionId);
301-
output.setFunctionName(functionName);
302-
output.setOutput(null);
303-
output.setError(null);
291+
StepResult output = new StepResult(workflowId, functionId, functionName);
304292

305293
// Record the operation result
306-
StepsDAO.recordStepResultTxn(output, conn, this.schema);
294+
StepsDAO.recordStepResultTxn(output, startTime, conn, this.schema);
307295
}
308296

309297
conn.commit();
@@ -324,6 +312,8 @@ public Object getEvent(
324312
if (dataSource.isClosed()) {
325313
throw new IllegalStateException("Database is closed!");
326314
}
315+
316+
var startTime = System.currentTimeMillis();
327317
String functionName = "DBOS.getEvent";
328318

329319
// Check for previous executions only if it's in a workflow
@@ -343,8 +333,8 @@ public Object getEvent(
343333

344334
if (recordedOutput != null) {
345335
logger.debug("Replaying getEvent, id: {}, key: {}", callerCtx.getFunctionId(), key);
346-
if (recordedOutput.getOutput() != null) {
347-
Object[] outputArray = JSONUtil.deserializeToArray(recordedOutput.getOutput());
336+
if (recordedOutput.output() != null) {
337+
Object[] outputArray = JSONUtil.deserializeToArray(recordedOutput.output());
348338
return outputArray == null ? null : outputArray[0];
349339
} else {
350340
throw new RuntimeException("No output recorded in the last getEvent");
@@ -430,15 +420,10 @@ public Object getEvent(
430420

431421
// Record the output if it's in a workflow
432422
if (callerCtx != null) {
433-
StepResult output = new StepResult();
434-
output.setWorkflowId(callerCtx.getWorkflowId());
435-
output.setStepId(callerCtx.getFunctionId());
436-
output.setFunctionName(functionName);
437-
output.setOutput(JSONUtil.serialize(value)); // null will be serialized to
438-
// 'null'
439-
output.setError(null);
440-
441-
StepsDAO.recordStepResultTxn(dataSource, output, this.schema);
423+
StepResult output =
424+
new StepResult(callerCtx.getWorkflowId(), callerCtx.getFunctionId(), functionName)
425+
.withOutput(JSONUtil.serialize(value));
426+
StepsDAO.recordStepResultTxn(dataSource, output, startTime, this.schema);
442427
}
443428

444429
return value;

0 commit comments

Comments
 (0)