Skip to content

Commit 9a4db11

Browse files
authored
Get Workflow Aggregates, Events, Notifications & Streams (#344)
1 parent 1b412ed commit 9a4db11

21 files changed

Lines changed: 1231 additions & 0 deletions

transact/src/main/java/dev/dbos/transact/DBOS.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.HashSet;
3838
import java.util.Iterator;
3939
import java.util.List;
40+
import java.util.Map;
4041
import java.util.Objects;
4142
import java.util.Optional;
4243
import java.util.Properties;
@@ -943,6 +944,19 @@ public void applySchedules(@NonNull List<WorkflowSchedule> schedules) {
943944
return ensureLaunched("retrieveWorkflow").retrieveWorkflow(workflowId);
944945
}
945946

947+
/**
948+
* Retrieves all events stored during the execution of a workflow. Events are key-value pairs that
949+
* workflows can set during execution to persist intermediate state or communicate between steps.
950+
* This method returns all events for the specified workflow with their deserialized values.
951+
*
952+
* @param workflowId the unique identifier of the workflow whose events to retrieve
953+
* @return a map containing all events for the workflow, where keys are event names and values are
954+
* the deserialized event data
955+
*/
956+
public @NonNull Map<String, Object> getAllEvents(@NonNull String workflowId) {
957+
return ensureLaunched("getAllEvents").getAllEvents(workflowId);
958+
}
959+
946960
/**
947961
* List all workflows
948962
*

transact/src/main/java/dev/dbos/transact/conductor/Conductor.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -684,6 +684,10 @@ CompletableFuture<BaseResponse> getResponseAsync(BaseMessage message, WebSocket
684684
case FORK_WORKFLOW -> handleFork(this, message);
685685
case GET_METRICS -> handleGetMetrics(this, message);
686686
case GET_SCHEDULE -> handleGetSchedule(this, message);
687+
case GET_WORKFLOW_AGGREGATES -> handleGetWorkflowAggregates(this, message);
688+
case GET_WORKFLOW_EVENTS -> handleGetWorkflowEvents(this, message);
689+
case GET_WORKFLOW_NOTIFICATIONS -> handleGetWorkflowNotifications(this, message);
690+
case GET_WORKFLOW_STREAMS -> handleGetWorkflowStreams(this, message);
687691
case GET_WORKFLOW -> handleGetWorkflow(this, message);
688692
case IMPORT_WORKFLOW -> handleImportWorkflow(this, message);
689693
case LIST_APPLICATION_VERSIONS -> handleListApplicationVersions(this, message);
@@ -988,6 +992,75 @@ static CompletableFuture<BaseResponse> handleGetMetrics(
988992
});
989993
}
990994

995+
static CompletableFuture<BaseResponse> handleGetWorkflowAggregates(
996+
Conductor conductor, BaseMessage message) {
997+
return CompletableFuture.supplyAsync(
998+
() -> {
999+
GetWorkflowAggregatesRequest request = (GetWorkflowAggregatesRequest) message;
1000+
try {
1001+
var rows = conductor.systemDatabase.getWorkflowAggregates(request.toInput());
1002+
return new GetWorkflowAggregatesResponse(request, rows);
1003+
} catch (Exception e) {
1004+
logger.error("Exception encountered when getting workflow aggregates", e);
1005+
return new GetWorkflowAggregatesResponse(request, e);
1006+
}
1007+
});
1008+
}
1009+
1010+
static CompletableFuture<BaseResponse> handleGetWorkflowEvents(
1011+
Conductor conductor, BaseMessage message) {
1012+
return CompletableFuture.supplyAsync(
1013+
() -> {
1014+
GetWorkflowEventsRequest request = (GetWorkflowEventsRequest) message;
1015+
try {
1016+
var events = conductor.systemDatabase.getAllEvents(request.workflow_id);
1017+
return new GetWorkflowEventsResponse(request, events);
1018+
} catch (Exception e) {
1019+
logger.error(
1020+
"Exception encountered when getting workflow events for {}",
1021+
request.workflow_id,
1022+
e);
1023+
return new GetWorkflowEventsResponse(request, e);
1024+
}
1025+
});
1026+
}
1027+
1028+
static CompletableFuture<BaseResponse> handleGetWorkflowNotifications(
1029+
Conductor conductor, BaseMessage message) {
1030+
return CompletableFuture.supplyAsync(
1031+
() -> {
1032+
GetWorkflowNotificationsRequest request = (GetWorkflowNotificationsRequest) message;
1033+
try {
1034+
var notifications = conductor.systemDatabase.getAllNotifications(request.workflow_id);
1035+
return new GetWorkflowNotificationsResponse(request, notifications);
1036+
} catch (Exception e) {
1037+
logger.error(
1038+
"Exception encountered when getting workflow notifications for {}",
1039+
request.workflow_id,
1040+
e);
1041+
return new GetWorkflowNotificationsResponse(request, e);
1042+
}
1043+
});
1044+
}
1045+
1046+
static CompletableFuture<BaseResponse> handleGetWorkflowStreams(
1047+
Conductor conductor, BaseMessage message) {
1048+
return CompletableFuture.supplyAsync(
1049+
() -> {
1050+
GetWorkflowStreamsRequest request = (GetWorkflowStreamsRequest) message;
1051+
try {
1052+
var streams = conductor.systemDatabase.getAllStreamEntries(request.workflow_id);
1053+
return new GetWorkflowStreamsResponse(request, streams);
1054+
} catch (Exception e) {
1055+
logger.error(
1056+
"Exception encountered when getting workflow streams for {}",
1057+
request.workflow_id,
1058+
e);
1059+
return new GetWorkflowStreamsResponse(request, e);
1060+
}
1061+
});
1062+
}
1063+
9911064
private static boolean isImportMessage(CharSequence data) {
9921065
int checkLen = Math.min(data.length(), 200);
9931066
return data.subSequence(0, checkLen).toString().contains("\"import_workflow\"");

transact/src/main/java/dev/dbos/transact/conductor/protocol/BaseMessage.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,12 @@
2020
@JsonSubTypes.Type(value = ForkWorkflowRequest.class, name = "fork_workflow"),
2121
@JsonSubTypes.Type(value = GetMetricsRequest.class, name = "get_metrics"),
2222
@JsonSubTypes.Type(value = GetScheduleRequest.class, name = "get_schedule"),
23+
@JsonSubTypes.Type(value = GetWorkflowAggregatesRequest.class, name = "get_workflow_aggregates"),
24+
@JsonSubTypes.Type(value = GetWorkflowEventsRequest.class, name = "get_workflow_events"),
25+
@JsonSubTypes.Type(
26+
value = GetWorkflowNotificationsRequest.class,
27+
name = "get_workflow_notifications"),
28+
@JsonSubTypes.Type(value = GetWorkflowStreamsRequest.class, name = "get_workflow_streams"),
2329
@JsonSubTypes.Type(value = GetWorkflowRequest.class, name = "get_workflow"),
2430
@JsonSubTypes.Type(value = ImportWorkflowRequest.class, name = "import_workflow"),
2531
@JsonSubTypes.Type(
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package dev.dbos.transact.conductor.protocol;
2+
3+
import dev.dbos.transact.workflow.GetWorkflowAggregatesInput;
4+
5+
import java.time.Instant;
6+
import java.util.List;
7+
8+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
9+
10+
@JsonIgnoreProperties(ignoreUnknown = true)
11+
public class GetWorkflowAggregatesRequest extends BaseMessage {
12+
13+
public Body body;
14+
15+
@JsonIgnoreProperties(ignoreUnknown = true)
16+
public static class Body {
17+
public boolean group_by_status;
18+
public boolean group_by_name;
19+
public boolean group_by_queue_name;
20+
public boolean group_by_executor_id;
21+
public boolean group_by_application_version;
22+
public List<String> status;
23+
public String start_time;
24+
public String end_time;
25+
public List<String> name;
26+
public List<String> app_version;
27+
public List<String> executor_id;
28+
public List<String> queue_name;
29+
public List<String> workflow_id_prefix;
30+
}
31+
32+
public GetWorkflowAggregatesInput toInput() {
33+
if (body == null) {
34+
return new GetWorkflowAggregatesInput();
35+
}
36+
return new GetWorkflowAggregatesInput(
37+
body.group_by_status,
38+
body.group_by_name,
39+
body.group_by_queue_name,
40+
body.group_by_executor_id,
41+
body.group_by_application_version,
42+
body.name,
43+
body.status,
44+
body.queue_name,
45+
body.executor_id,
46+
body.app_version,
47+
body.workflow_id_prefix,
48+
body.start_time != null ? Instant.parse(body.start_time) : null,
49+
body.end_time != null ? Instant.parse(body.end_time) : null);
50+
}
51+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package dev.dbos.transact.conductor.protocol;
2+
3+
import dev.dbos.transact.workflow.WorkflowAggregateRow;
4+
5+
import java.util.Collections;
6+
import java.util.List;
7+
import java.util.Map;
8+
9+
public class GetWorkflowAggregatesResponse extends BaseResponse {
10+
11+
public static record WorkflowAggregateOutput(Map<String, String> group, long count) {
12+
public static WorkflowAggregateOutput from(WorkflowAggregateRow row) {
13+
return new WorkflowAggregateOutput(row.group(), row.count());
14+
}
15+
}
16+
17+
public List<WorkflowAggregateOutput> output;
18+
19+
public GetWorkflowAggregatesResponse() {}
20+
21+
public GetWorkflowAggregatesResponse(BaseMessage message, List<WorkflowAggregateRow> rows) {
22+
super(message.type, message.request_id);
23+
this.output = rows.stream().map(WorkflowAggregateOutput::from).toList();
24+
}
25+
26+
public GetWorkflowAggregatesResponse(BaseMessage message, Exception ex) {
27+
super(message.type, message.request_id, ex.getMessage());
28+
this.output = Collections.emptyList();
29+
}
30+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package dev.dbos.transact.conductor.protocol;
2+
3+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
4+
5+
@JsonIgnoreProperties(ignoreUnknown = true)
6+
public class GetWorkflowEventsRequest extends BaseMessage {
7+
public String workflow_id;
8+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package dev.dbos.transact.conductor.protocol;
2+
3+
import dev.dbos.transact.json.JSONUtil;
4+
5+
import java.util.Collections;
6+
import java.util.List;
7+
import java.util.Map;
8+
9+
public class GetWorkflowEventsResponse extends BaseResponse {
10+
11+
public record EventOutput(String key, String value) {
12+
public static EventOutput from(Map.Entry<String, Object> entry) {
13+
return new EventOutput(entry.getKey(), JSONUtil.toJson(entry.getValue()));
14+
}
15+
}
16+
17+
public List<EventOutput> events;
18+
19+
public GetWorkflowEventsResponse() {}
20+
21+
public GetWorkflowEventsResponse(BaseMessage message, Map<String, Object> events) {
22+
super(message.type, message.request_id);
23+
this.events = events.entrySet().stream().map(EventOutput::from).toList();
24+
}
25+
26+
public GetWorkflowEventsResponse(BaseMessage message, Exception ex) {
27+
super(message.type, message.request_id, ex.getMessage());
28+
this.events = Collections.emptyList();
29+
}
30+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package dev.dbos.transact.conductor.protocol;
2+
3+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
4+
5+
@JsonIgnoreProperties(ignoreUnknown = true)
6+
public class GetWorkflowNotificationsRequest extends BaseMessage {
7+
public String workflow_id;
8+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package dev.dbos.transact.conductor.protocol;
2+
3+
import dev.dbos.transact.json.JSONUtil;
4+
import dev.dbos.transact.workflow.NotificationInfo;
5+
6+
import java.util.Collections;
7+
import java.util.List;
8+
9+
public class GetWorkflowNotificationsResponse extends BaseResponse {
10+
11+
public record NotificationOutput(
12+
String topic, String message, long created_at_epoch_ms, boolean consumed) {
13+
public static NotificationOutput from(NotificationInfo info) {
14+
return new NotificationOutput(
15+
info.topic(), JSONUtil.toJson(info.message()), info.createdAtEpochMs(), info.consumed());
16+
}
17+
}
18+
19+
public List<NotificationOutput> notifications;
20+
21+
public GetWorkflowNotificationsResponse() {}
22+
23+
public GetWorkflowNotificationsResponse(BaseMessage message, List<NotificationInfo> infos) {
24+
super(message.type, message.request_id);
25+
this.notifications = infos.stream().map(NotificationOutput::from).toList();
26+
}
27+
28+
public GetWorkflowNotificationsResponse(BaseMessage message, Exception ex) {
29+
super(message.type, message.request_id, ex.getMessage());
30+
this.notifications = Collections.emptyList();
31+
}
32+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package dev.dbos.transact.conductor.protocol;
2+
3+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
4+
5+
@JsonIgnoreProperties(ignoreUnknown = true)
6+
public class GetWorkflowStreamsRequest extends BaseMessage {
7+
public String workflow_id;
8+
}

0 commit comments

Comments
 (0)