Skip to content

Commit 59f8cf3

Browse files
authored
add Alert handler support (#284)
fixes #275
1 parent 1eb31de commit 59f8cf3

8 files changed

Lines changed: 293 additions & 55 deletions

File tree

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package dev.dbos.transact;
2+
3+
import java.util.Map;
4+
5+
@FunctionalInterface
6+
public interface AlertHandler {
7+
void invoke(String name, String message, Map<String, String> metadata);
8+
}

transact/src/main/java/dev/dbos/transact/DBOS.java

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,15 @@
1616
import dev.dbos.transact.migrations.MigrationManager;
1717
import dev.dbos.transact.tempworkflows.InternalWorkflowsService;
1818
import dev.dbos.transact.tempworkflows.InternalWorkflowsServiceImpl;
19-
import dev.dbos.transact.workflow.*;
19+
import dev.dbos.transact.workflow.ForkOptions;
20+
import dev.dbos.transact.workflow.ListWorkflowsInput;
21+
import dev.dbos.transact.workflow.Queue;
22+
import dev.dbos.transact.workflow.StepInfo;
23+
import dev.dbos.transact.workflow.StepOptions;
24+
import dev.dbos.transact.workflow.Workflow;
25+
import dev.dbos.transact.workflow.WorkflowClassName;
26+
import dev.dbos.transact.workflow.WorkflowHandle;
27+
import dev.dbos.transact.workflow.WorkflowStatus;
2028

2129
import java.io.IOException;
2230
import java.io.InputStream;
@@ -79,6 +87,7 @@ public static class Instance {
7987
private final WorkflowRegistry workflowRegistry = new WorkflowRegistry();
8088
private final QueueRegistry queueRegistry = new QueueRegistry();
8189
private final Set<DBOSLifecycleListener> lifecycleRegistry = ConcurrentHashMap.newKeySet();
90+
private AlertHandler alertHandler;
8291

8392
private DBOSConfig config;
8493

@@ -183,6 +192,14 @@ void clearRegistry() {
183192
registerInternals();
184193
}
185194

195+
public void registerAlertHandler(AlertHandler handler) {
196+
if (dbosExecutor.get() != null) {
197+
throw new IllegalStateException("Cannot set alert handler after DBOS is launched");
198+
}
199+
200+
this.alertHandler = handler;
201+
}
202+
186203
// package private methods for test purposes
187204
@Nullable DBOSExecutor getDbosExecutor() {
188205
return dbosExecutor.get();
@@ -216,10 +233,11 @@ public void launch() {
216233
if (dbosExecutor.compareAndSet(null, executor)) {
217234
executor.start(
218235
this,
219-
new HashSet<DBOSLifecycleListener>(this.lifecycleRegistry),
236+
new HashSet<>(this.lifecycleRegistry),
220237
workflowRegistry.getWorkflowSnapshot(),
221238
workflowRegistry.getInstanceSnapshot(),
222-
queueRegistry.getSnapshot());
239+
queueRegistry.getSnapshot(),
240+
alertHandler);
223241
}
224242
}
225243
}
@@ -299,6 +317,18 @@ public static void registerLifecycleListener(@NonNull DBOSLifecycleListener list
299317
ensureInstance().registerLifecycleListener(listener);
300318
}
301319

320+
/**
321+
* Registers an {@link AlertHandler} to handle alerts generated by DBOS. This method must be
322+
* called before DBOS is launched; attempting to register an alert handler after launch will
323+
* result in an {@link IllegalStateException}.
324+
*
325+
* @param handler the {@link AlertHandler} instance to register; must not be null
326+
* @throws IllegalStateException if called after DBOS has been launched
327+
*/
328+
public static void registerAlertHandler(AlertHandler handler) {
329+
ensureInstance().registerAlertHandler(handler);
330+
}
331+
302332
/**
303333
* Reinitializes the singleton instance of DBOS with config. For use in tests that reinitialize
304334
* DBOS @DBOSConfig config dbos configuration

transact/src/main/java/dev/dbos/transact/conductor/Conductor.java

Lines changed: 54 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,34 @@
11
package dev.dbos.transact.conductor;
22

3-
import dev.dbos.transact.conductor.protocol.*;
3+
import dev.dbos.transact.conductor.protocol.AlertRequest;
4+
import dev.dbos.transact.conductor.protocol.BaseMessage;
5+
import dev.dbos.transact.conductor.protocol.BaseResponse;
6+
import dev.dbos.transact.conductor.protocol.CancelRequest;
7+
import dev.dbos.transact.conductor.protocol.DeleteRequest;
8+
import dev.dbos.transact.conductor.protocol.ExecutorInfoResponse;
9+
import dev.dbos.transact.conductor.protocol.ExistPendingWorkflowsRequest;
10+
import dev.dbos.transact.conductor.protocol.ExistPendingWorkflowsResponse;
11+
import dev.dbos.transact.conductor.protocol.ExportWorkflowRequest;
12+
import dev.dbos.transact.conductor.protocol.ExportWorkflowResponse;
13+
import dev.dbos.transact.conductor.protocol.ForkWorkflowRequest;
14+
import dev.dbos.transact.conductor.protocol.ForkWorkflowResponse;
15+
import dev.dbos.transact.conductor.protocol.GetMetricsRequest;
16+
import dev.dbos.transact.conductor.protocol.GetMetricsResponse;
17+
import dev.dbos.transact.conductor.protocol.GetWorkflowRequest;
18+
import dev.dbos.transact.conductor.protocol.GetWorkflowResponse;
19+
import dev.dbos.transact.conductor.protocol.ImportWorkflowRequest;
20+
import dev.dbos.transact.conductor.protocol.ListQueuedWorkflowsRequest;
21+
import dev.dbos.transact.conductor.protocol.ListStepsRequest;
22+
import dev.dbos.transact.conductor.protocol.ListStepsResponse;
23+
import dev.dbos.transact.conductor.protocol.ListWorkflowsRequest;
24+
import dev.dbos.transact.conductor.protocol.MessageType;
25+
import dev.dbos.transact.conductor.protocol.RecoveryRequest;
26+
import dev.dbos.transact.conductor.protocol.RestartRequest;
27+
import dev.dbos.transact.conductor.protocol.ResumeRequest;
28+
import dev.dbos.transact.conductor.protocol.RetentionRequest;
29+
import dev.dbos.transact.conductor.protocol.SuccessResponse;
30+
import dev.dbos.transact.conductor.protocol.WorkflowOutputsResponse;
31+
import dev.dbos.transact.conductor.protocol.WorkflowsOutput;
432
import dev.dbos.transact.database.SystemDatabase;
533
import dev.dbos.transact.execution.DBOSExecutor;
634
import dev.dbos.transact.json.JSONUtil;
@@ -78,22 +106,23 @@ public class Conductor implements AutoCloseable {
78106
static {
79107
Map<MessageType, BiFunction<Conductor, BaseMessage, CompletableFuture<BaseResponse>>> map =
80108
new java.util.EnumMap<>(MessageType.class);
81-
map.put(MessageType.EXECUTOR_INFO, Conductor::handleExecutorInfo);
82-
map.put(MessageType.RECOVERY, Conductor::handleRecovery);
109+
map.put(MessageType.ALERT, Conductor::handleAlert);
83110
map.put(MessageType.CANCEL, Conductor::handleCancel);
84111
map.put(MessageType.DELETE, Conductor::handleDelete);
85-
map.put(MessageType.RESUME, Conductor::handleResume);
86-
map.put(MessageType.RESTART, Conductor::handleRestart);
112+
map.put(MessageType.EXECUTOR_INFO, Conductor::handleExecutorInfo);
113+
map.put(MessageType.EXIST_PENDING_WORKFLOWS, Conductor::handleExistPendingWorkflows);
114+
map.put(MessageType.EXPORT_WORKFLOW, Conductor::handleExportWorkflow);
87115
map.put(MessageType.FORK_WORKFLOW, Conductor::handleFork);
88-
map.put(MessageType.LIST_WORKFLOWS, Conductor::handleListWorkflows);
116+
map.put(MessageType.GET_METRICS, Conductor::handleGetMetrics);
117+
map.put(MessageType.GET_WORKFLOW, Conductor::handleGetWorkflow);
118+
map.put(MessageType.IMPORT_WORKFLOW, Conductor::handleImportWorkflow);
89119
map.put(MessageType.LIST_QUEUED_WORKFLOWS, Conductor::handleListQueuedWorkflows);
90120
map.put(MessageType.LIST_STEPS, Conductor::handleListSteps);
91-
map.put(MessageType.EXIST_PENDING_WORKFLOWS, Conductor::handleExistPendingWorkflows);
92-
map.put(MessageType.GET_WORKFLOW, Conductor::handleGetWorkflow);
121+
map.put(MessageType.LIST_WORKFLOWS, Conductor::handleListWorkflows);
122+
map.put(MessageType.RECOVERY, Conductor::handleRecovery);
123+
map.put(MessageType.RESTART, Conductor::handleRestart);
124+
map.put(MessageType.RESUME, Conductor::handleResume);
93125
map.put(MessageType.RETENTION, Conductor::handleRetention);
94-
map.put(MessageType.GET_METRICS, Conductor::handleGetMetrics);
95-
map.put(MessageType.IMPORT_WORKFLOW, Conductor::handleImportWorkflow);
96-
map.put(MessageType.EXPORT_WORKFLOW, Conductor::handleExportWorkflow);
97126

98127
dispatchMap = Collections.unmodifiableMap(map);
99128
}
@@ -1008,4 +1037,18 @@ static String serializeExportedWorkflows(List<ExportedWorkflow> workflows) throw
10081037

10091038
return Base64.getEncoder().encodeToString(out.toByteArray());
10101039
}
1040+
1041+
static CompletableFuture<BaseResponse> handleAlert(Conductor conductor, BaseMessage message) {
1042+
return CompletableFuture.supplyAsync(
1043+
() -> {
1044+
AlertRequest request = (AlertRequest) message;
1045+
try {
1046+
conductor.dbosExecutor.fireAlertHandler(
1047+
request.name, request.message, request.metadata);
1048+
return new SuccessResponse(request, true);
1049+
} catch (Exception e) {
1050+
return new SuccessResponse(request, e);
1051+
}
1052+
});
1053+
}
10111054
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package dev.dbos.transact.conductor.protocol;
2+
3+
import java.util.Map;
4+
5+
public class AlertRequest extends BaseMessage {
6+
public String name;
7+
public String message;
8+
public Map<String, String> metadata;
9+
10+
public AlertRequest() {}
11+
12+
public AlertRequest(String requestId, String name, String message, Map<String, String> metadata) {
13+
this.type = MessageType.ALERT.getValue();
14+
this.request_id = requestId;
15+
this.name = name;
16+
this.message = message;
17+
this.metadata = metadata;
18+
}
19+
}

transact/src/main/java/dev/dbos/transact/conductor/protocol/BaseMessage.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,32 @@
11
package dev.dbos.transact.conductor.protocol;
22

3-
import com.fasterxml.jackson.annotation.*;
3+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
4+
import com.fasterxml.jackson.annotation.JsonSubTypes;
5+
import com.fasterxml.jackson.annotation.JsonTypeInfo;
46

57
@JsonTypeInfo(
68
use = JsonTypeInfo.Id.NAME,
79
include = JsonTypeInfo.As.EXISTING_PROPERTY,
810
property = "type",
911
visible = true)
1012
@JsonSubTypes({
11-
@JsonSubTypes.Type(value = ExecutorInfoRequest.class, name = "executor_info"),
12-
@JsonSubTypes.Type(value = RecoveryRequest.class, name = "recovery"),
13+
@JsonSubTypes.Type(value = AlertRequest.class, name = "alert"),
1314
@JsonSubTypes.Type(value = CancelRequest.class, name = "cancel"),
1415
@JsonSubTypes.Type(value = DeleteRequest.class, name = "delete"),
15-
@JsonSubTypes.Type(value = ResumeRequest.class, name = "resume"),
16-
@JsonSubTypes.Type(value = RestartRequest.class, name = "restart"),
16+
@JsonSubTypes.Type(value = ExecutorInfoRequest.class, name = "executor_info"),
17+
@JsonSubTypes.Type(value = ExistPendingWorkflowsRequest.class, name = "exist_pending_workflows"),
18+
@JsonSubTypes.Type(value = ExportWorkflowRequest.class, name = "export_workflow"),
1719
@JsonSubTypes.Type(value = ForkWorkflowRequest.class, name = "fork_workflow"),
18-
@JsonSubTypes.Type(value = ListWorkflowsRequest.class, name = "list_workflows"),
19-
@JsonSubTypes.Type(value = ListQueuedWorkflowsRequest.class, name = "list_queued_workflows"),
20+
@JsonSubTypes.Type(value = GetMetricsRequest.class, name = "get_metrics"),
2021
@JsonSubTypes.Type(value = GetWorkflowRequest.class, name = "get_workflow"),
21-
@JsonSubTypes.Type(value = ExistPendingWorkflowsRequest.class, name = "exist_pending_workflows"),
22+
@JsonSubTypes.Type(value = ImportWorkflowRequest.class, name = "import_workflow"),
23+
@JsonSubTypes.Type(value = ListQueuedWorkflowsRequest.class, name = "list_queued_workflows"),
2224
@JsonSubTypes.Type(value = ListStepsRequest.class, name = "list_steps"),
25+
@JsonSubTypes.Type(value = ListWorkflowsRequest.class, name = "list_workflows"),
26+
@JsonSubTypes.Type(value = RecoveryRequest.class, name = "recovery"),
27+
@JsonSubTypes.Type(value = RestartRequest.class, name = "restart"),
28+
@JsonSubTypes.Type(value = ResumeRequest.class, name = "resume"),
2329
@JsonSubTypes.Type(value = RetentionRequest.class, name = "retention"),
24-
@JsonSubTypes.Type(value = GetMetricsRequest.class, name = "get_metrics"),
25-
@JsonSubTypes.Type(value = ExportWorkflowRequest.class, name = "export_workflow"),
26-
@JsonSubTypes.Type(value = ImportWorkflowRequest.class, name = "import_workflow"),
2730
})
2831
@JsonIgnoreProperties(ignoreUnknown = true)
2932
public abstract class BaseMessage {

transact/src/main/java/dev/dbos/transact/conductor/protocol/MessageType.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,23 @@
11
package dev.dbos.transact.conductor.protocol;
22

33
public enum MessageType {
4-
EXECUTOR_INFO("executor_info"),
5-
RECOVERY("recovery"),
4+
ALERT("alert"),
65
CANCEL("cancel"),
76
DELETE("delete"),
8-
LIST_WORKFLOWS("list_workflows"),
9-
LIST_QUEUED_WORKFLOWS("list_queued_workflows"),
10-
RESUME("resume"),
11-
RESTART("restart"),
12-
GET_WORKFLOW("get_workflow"),
7+
EXECUTOR_INFO("executor_info"),
138
EXIST_PENDING_WORKFLOWS("exist_pending_workflows"),
14-
LIST_STEPS("list_steps"),
9+
EXPORT_WORKFLOW("export_workflow"),
1510
FORK_WORKFLOW("fork_workflow"),
16-
RETENTION("retention"),
1711
GET_METRICS("get_metrics"),
18-
EXPORT_WORKFLOW("export_workflow"),
19-
IMPORT_WORKFLOW("import_workflow");
12+
GET_WORKFLOW("get_workflow"),
13+
IMPORT_WORKFLOW("import_workflow"),
14+
LIST_QUEUED_WORKFLOWS("list_queued_workflows"),
15+
LIST_STEPS("list_steps"),
16+
LIST_WORKFLOWS("list_workflows"),
17+
RECOVERY("recovery"),
18+
RESTART("restart"),
19+
RESUME("resume"),
20+
RETENTION("retention");
2021

2122
private final String value;
2223

transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package dev.dbos.transact.execution;
22

3+
import dev.dbos.transact.AlertHandler;
34
import dev.dbos.transact.Constants;
45
import dev.dbos.transact.DBOS;
56
import dev.dbos.transact.StartWorkflowOptions;
@@ -15,7 +16,11 @@
1516
import dev.dbos.transact.database.Result;
1617
import dev.dbos.transact.database.SystemDatabase;
1718
import dev.dbos.transact.database.WorkflowInitResult;
18-
import dev.dbos.transact.exceptions.*;
19+
import dev.dbos.transact.exceptions.DBOSAwaitedWorkflowCancelledException;
20+
import dev.dbos.transact.exceptions.DBOSNonExistentWorkflowException;
21+
import dev.dbos.transact.exceptions.DBOSWorkflowCancelledException;
22+
import dev.dbos.transact.exceptions.DBOSWorkflowExecutionConflictException;
23+
import dev.dbos.transact.exceptions.DBOSWorkflowFunctionNotFoundException;
1924
import dev.dbos.transact.internal.AppVersionComputer;
2025
import dev.dbos.transact.internal.DBOSInvocationHandler;
2126
import dev.dbos.transact.internal.Invocation;
@@ -51,7 +56,13 @@
5156
import java.util.Optional;
5257
import java.util.Set;
5358
import java.util.UUID;
54-
import java.util.concurrent.*;
59+
import java.util.concurrent.CompletableFuture;
60+
import java.util.concurrent.ConcurrentHashMap;
61+
import java.util.concurrent.ExecutorService;
62+
import java.util.concurrent.Executors;
63+
import java.util.concurrent.ScheduledExecutorService;
64+
import java.util.concurrent.ThreadPoolExecutor;
65+
import java.util.concurrent.TimeUnit;
5566
import java.util.concurrent.atomic.AtomicBoolean;
5667
import java.util.concurrent.atomic.AtomicReference;
5768
import java.util.function.Supplier;
@@ -85,6 +96,7 @@ public class DBOSExecutor implements AutoCloseable {
8596
private Conductor conductor;
8697
private ExecutorService executorService;
8798
private ScheduledExecutorService timeoutScheduler;
99+
private AlertHandler alertHandler;
88100
private final AtomicBoolean isRunning = new AtomicBoolean(false);
89101

90102
public DBOSExecutor(DBOSConfig config) {
@@ -119,7 +131,8 @@ public void start(
119131
Set<DBOSLifecycleListener> listenerSet,
120132
Map<String, RegisteredWorkflow> workflowMap,
121133
Map<String, RegisteredWorkflowInstance> instanceMap,
122-
List<Queue> queues) {
134+
List<Queue> queues,
135+
AlertHandler alertHandler) {
123136

124137
if (isRunning.compareAndSet(false, true)) {
125138
logger.info("DBOS Executor starting");
@@ -128,6 +141,7 @@ public void start(
128141
this.instanceMap = Collections.unmodifiableMap(instanceMap);
129142
this.queues = Collections.unmodifiableList(queues);
130143
this.listeners = listenerSet;
144+
this.alertHandler = alertHandler;
131145

132146
if (this.appVersion == null || this.appVersion.isEmpty()) {
133147
List<Class<?>> registeredClasses =
@@ -331,6 +345,18 @@ public Optional<Queue> getQueue(String queueName) {
331345
return Optional.empty();
332346
}
333347

348+
public void fireAlertHandler(String name, String message, Map<String, String> metadata) {
349+
if (alertHandler != null) {
350+
alertHandler.invoke(name, message, metadata);
351+
} else {
352+
logger.warn(
353+
"No AlertHandler configured; dropping alert. name='{}', message='{}', metadata={}",
354+
name,
355+
message,
356+
metadata);
357+
}
358+
}
359+
334360
WorkflowHandle<?, ?> recoverWorkflow(GetPendingWorkflowsOutput output) {
335361
Objects.requireNonNull(output, "output must not be null");
336362
String workflowId = Objects.requireNonNull(output.workflowId(), "workflowId must not be null");

0 commit comments

Comments
 (0)