Skip to content

Commit 226e734

Browse files
authored
Update listWorkflows (#448)
Add completed_after, completed_before, dequeued_after and dequeued_before support to ListWorkflows, including conductor and admin server. Also fix issue where conductor doesn't return an error response on unkown request types fixes #447
1 parent bbcc25b commit 226e734

11 files changed

Lines changed: 594 additions & 146 deletions

File tree

transact/src/main/java/dev/dbos/transact/admin/ListQueuedWorkflowsRequest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,11 @@ public ListWorkflowsInput asInput() {
4949
parent_workflow_id,
5050
null, // wasForkedFrom
5151
null, // hasParent
52-
null // attributes
52+
null, // attributes
53+
null, // completedAfter
54+
null, // completedBefore
55+
null, // dequeuedAfter
56+
null // dequeuedBefore
5357
);
5458
}
5559
}

transact/src/main/java/dev/dbos/transact/admin/ListWorkflowsRequest.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ public record ListWorkflowsRequest(
1616
@JsonDeserialize(using = StringOrListDeserializer.class) List<String> authenticated_user,
1717
String start_time,
1818
String end_time,
19+
String completed_after,
20+
String completed_before,
21+
String dequeued_after,
22+
String dequeued_before,
1923
@JsonDeserialize(using = StringOrListDeserializer.class) List<String> status,
2024
@JsonDeserialize(using = StringOrListDeserializer.class) List<String> application_version,
2125
@JsonDeserialize(using = StringOrListDeserializer.class) List<String> fork_from,
@@ -53,7 +57,10 @@ public ListWorkflowsInput asInput() {
5357
parent_workflow_id,
5458
null, // wasForkedFrom
5559
null, // hasParent
56-
null // attributes
57-
);
60+
null, // attributes
61+
completed_after != null ? Instant.parse(completed_after) : null,
62+
completed_before != null ? Instant.parse(completed_before) : null,
63+
dequeued_after != null ? Instant.parse(dequeued_after) : null,
64+
dequeued_before != null ? Instant.parse(dequeued_before) : null);
5865
}
5966
}

transact/src/main/java/dev/dbos/transact/conductor/Conductor.java

Lines changed: 126 additions & 112 deletions
Large diffs are not rendered by default.

transact/src/main/java/dev/dbos/transact/conductor/protocol/GetWorkflowRequest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,11 @@ public ListWorkflowsInput toInput() {
4141
null, // parentWorkflowId
4242
null, // wasForkedFrom
4343
null, // hasParent
44-
null // attributes
44+
null, // attributes
45+
null, // completedAfter
46+
null, // completedBefore
47+
null, // dequeuedAfter
48+
null // dequeuedBefore
4549
);
4650
}
4751
}

transact/src/main/java/dev/dbos/transact/conductor/protocol/ListQueuedWorkflowsRequest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,11 @@ public ListWorkflowsInput asInput() {
8787
body.parent_workflow_id,
8888
body.was_forked_from,
8989
body.has_parent,
90-
body.attributes);
90+
body.attributes,
91+
null, // completedAfter
92+
null, // completedBefore
93+
null, // dequeuedAfter
94+
null // dequeuedBefore
95+
);
9196
}
9297
}

transact/src/main/java/dev/dbos/transact/conductor/protocol/ListWorkflowsRequest.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ public static class Body {
2626

2727
public String start_time;
2828
public String end_time;
29+
public String completed_after;
30+
public String completed_before;
31+
public String dequeued_after;
32+
public String dequeued_before;
2933

3034
@JsonDeserialize(using = StringOrListDeserializer.class)
3135
public List<String> status;
@@ -85,6 +89,10 @@ public ListWorkflowsInput asInput() {
8589
body.parent_workflow_id,
8690
body.was_forked_from,
8791
body.has_parent,
88-
body.attributes);
92+
body.attributes,
93+
body.completed_after != null ? Instant.parse(body.completed_after) : null,
94+
body.completed_before != null ? Instant.parse(body.completed_before) : null,
95+
body.dequeued_after != null ? Instant.parse(body.dequeued_after) : null,
96+
body.dequeued_before != null ? Instant.parse(body.dequeued_before) : null);
8997
}
9098
}

transact/src/main/java/dev/dbos/transact/conductor/protocol/MessageType.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public static MessageType fromValue(String value) {
5252
return type;
5353
}
5454
}
55-
throw new IllegalArgumentException("Unknown message type: " + value);
55+
return null;
5656
}
5757

5858
@Override

transact/src/main/java/dev/dbos/transact/database/dao/WorkflowDAO.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -728,6 +728,22 @@ public static List<WorkflowStatus> listWorkflows(DbContext ctx, ListWorkflowsInp
728728
whereConditions.add("created_at <= ?");
729729
parameters.add(input.endTime().toEpochMilli());
730730
}
731+
if (input.completedAfter() != null) {
732+
whereConditions.add("completed_at >= ?");
733+
parameters.add(input.completedAfter().toEpochMilli());
734+
}
735+
if (input.completedBefore() != null) {
736+
whereConditions.add("completed_at <= ?");
737+
parameters.add(input.completedBefore().toEpochMilli());
738+
}
739+
if (input.dequeuedAfter() != null) {
740+
whereConditions.add("started_at_epoch_ms >= ?");
741+
parameters.add(input.dequeuedAfter().toEpochMilli());
742+
}
743+
if (input.dequeuedBefore() != null) {
744+
whereConditions.add("started_at_epoch_ms <= ?");
745+
parameters.add(input.dequeuedBefore().toEpochMilli());
746+
}
731747
if (input.status() != null && !input.status().isEmpty()) {
732748
whereConditions.add("status = ANY(?)");
733749
parameters.add(input.status());

0 commit comments

Comments
 (0)