Skip to content

Commit 59b61ee

Browse files
authored
Update Admin Controller (#28)
Implements most of the admin controller methods with tests. Unimplemented ones marked with TODO for later implementation
1 parent f551e69 commit 59b61ee

8 files changed

Lines changed: 445 additions & 45 deletions

File tree

build.gradle.kts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,13 @@ dependencies {
4545
implementation("org.glassfish.jersey.containers:jersey-container-servlet-core:3.1.0")
4646
implementation("org.glassfish.jersey.inject:jersey-hk2:3.1.1")
4747
implementation("org.glassfish.jersey.core:jersey-server:3.1.0")
48+
implementation("org.glassfish.jersey.media:jersey-media-json-jackson:3.1.0")
4849

4950
testImplementation("ch.qos.logback:logback-classic:1.5.6")
5051
testImplementation("org.mockito:mockito-core:5.12.0")
52+
testImplementation("io.rest-assured:rest-assured:5.4.0")
53+
testImplementation("io.rest-assured:json-path:5.4.0")
54+
testImplementation("io.rest-assured:xml-path:5.4.0")
5155
testImplementation(platform("org.junit:junit-bom:5.10.0"))
5256
testImplementation("org.junit.jupiter:junit-jupiter")
5357
}

src/main/java/dev/dbos/transact/execution/DBOSExecutor.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
import java.lang.reflect.InvocationTargetException;
2929
import java.lang.reflect.Method;
30+
import java.util.List;
3031
import java.util.Optional;
3132
import java.util.UUID;
3233
import java.util.concurrent.*;
@@ -72,6 +73,13 @@ public WorkflowFunctionWrapper getWorkflow(String workflowName) {
7273
return workflowRegistry.get(workflowName);
7374
}
7475

76+
public List<Queue> getAllQueuesSnapshot() {
77+
if (queueService == null) {
78+
throw new IllegalStateException("QueueService not set in DBOSExecutor");
79+
}
80+
return queueService.getAllQueuesSnapshot();
81+
}
82+
7583
public WorkflowInitResult preInvokeWorkflow(String workflowName, String className,
7684
Object[] inputs, String workflowId, String queueName) {
7785

@@ -411,7 +419,7 @@ public WorkflowHandle executeWorkflowById(String workflowId) {
411419
throw new WorkflowFunctionNotFoundException(workflowId);
412420
}
413421

414-
WorkflowHandle handle = null;
422+
WorkflowHandle<?> handle = null;
415423
try (SetWorkflowID id = new SetWorkflowID(workflowId)) {
416424
DBOSContextHolder.get().setInWorkflow(true);
417425
try {

src/main/java/dev/dbos/transact/http/HttpServer.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import org.apache.catalina.Context;
88
import org.apache.catalina.startup.Tomcat;
9+
import org.glassfish.jersey.jackson.JacksonFeature;
910
import org.glassfish.jersey.server.ResourceConfig;
1011
import org.glassfish.jersey.servlet.ServletContainer;
1112
import org.slf4j.Logger;
@@ -71,13 +72,14 @@ private void setUpContext() {
7172
ResourceConfig resourceConfig = new ResourceConfig();
7273
resourceConfig.registerInstances(adminController);
7374

75+
// Register Jackson JSON providers for proper JSON serialization
76+
resourceConfig.register(JacksonFeature.class);
77+
7478
// In future if we need to scan from a package
7579
// resourceConfig.packages(pkg);
7680

7781
// Add the REST API servlet
78-
var jerseyservlet = tomcat.addServlet(contextPath,
79-
"jersey-servlet",
80-
new ServletContainer(resourceConfig));
82+
tomcat.addServlet(contextPath, "jersey-servlet", new ServletContainer(resourceConfig));
8183
context.addServletMappingDecoded("/*", "jersey-servlet");
8284
}
8385
}

src/main/java/dev/dbos/transact/http/controllers/AdminController.java

Lines changed: 126 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,21 @@
22

33
import dev.dbos.transact.database.SystemDatabase;
44
import dev.dbos.transact.execution.DBOSExecutor;
5+
import dev.dbos.transact.queue.Queue;
6+
import dev.dbos.transact.queue.QueueMetadata;
7+
import dev.dbos.transact.workflow.ForkOptions;
58
import dev.dbos.transact.workflow.ListWorkflowsInput;
69
import dev.dbos.transact.workflow.StepInfo;
10+
import dev.dbos.transact.workflow.WorkflowHandle;
711
import dev.dbos.transact.workflow.WorkflowStatus;
812

13+
import java.sql.SQLException;
914
import java.util.ArrayList;
1015
import java.util.List;
1116

1217
import jakarta.ws.rs.*;
1318
import jakarta.ws.rs.core.MediaType;
19+
import jakarta.ws.rs.core.Response;
1420
import org.slf4j.Logger;
1521
import org.slf4j.LoggerFactory;
1622

@@ -27,79 +33,170 @@ public AdminController(SystemDatabase s, DBOSExecutor e) {
2733
}
2834

2935
@GET
30-
@Path("/healthz")
36+
@Path("/dbos-healthz")
3137
@Produces(MediaType.TEXT_PLAIN)
3238
public String health() {
33-
return "Healthy";
39+
return "healthy";
3440
}
3541

3642
@GET
37-
@Path("/deactivate")
38-
@Produces(MediaType.TEXT_PLAIN)
39-
public String deactivate() {
40-
return "deactivated";
43+
@Path("/dbos-perf")
44+
public Response perf() {
45+
// TODO: implement perf hooks
46+
return Response.serverError().build();
4147
}
4248

4349
@GET
44-
@Path("/workflow-queues-metadata")
45-
@Produces(MediaType.APPLICATION_JSON)
46-
public Object workflowQueuesMetadata() {
47-
return "queuesMetadata";
50+
@Path("/dbos-deactivate")
51+
public Response deactivate() {
52+
// TODO: implement dbosExec.deactivateEventReceivers
53+
return Response.serverError().build();
4854
}
4955

50-
@GET
51-
@Path("/workflows/{workflowId}/steps")
56+
@POST
57+
@Path("/dbos-workflow-recovery")
58+
@Consumes(MediaType.APPLICATION_JSON)
5259
@Produces(MediaType.APPLICATION_JSON)
53-
public List<StepInfo> ListSteps(@PathParam("workflowId") String workflowId) {
54-
logger.info("Retrieving steps for workflow: " + workflowId);
55-
return systemDatabase.listWorkflowSteps(workflowId);
60+
public Response recovery(List<String> executorIds) {
61+
// TODO: implement dbosExec.recoverPendingWorkflows
62+
return Response.serverError().build();
63+
}
64+
65+
@POST
66+
@Path("/dbos-garbage-collect")
67+
public Response garbageCollect() {
68+
// TODO: implement systemDatabase.garbageCollect
69+
return Response.serverError().build();
70+
}
71+
72+
@POST
73+
@Path("/dbos-global-timeout")
74+
public Response globalTimeout() {
75+
// TODO: implement globalTimeout
76+
return Response.serverError().build();
5677
}
5778

5879
@GET
59-
@Path("/workflows/{workflowId}")
80+
@Path("/dbos-workflow-queues-metadata")
6081
@Produces(MediaType.APPLICATION_JSON)
61-
public List<WorkflowStatus> ListWorkflows(@PathParam("workflowId") String workflowId) {
62-
return new ArrayList<>();
82+
public List<QueueMetadata> workflowQueuesMetadata() {
83+
List<Queue> queues = dbosExecutor.getAllQueuesSnapshot();
84+
List<QueueMetadata> metadataList = new ArrayList<QueueMetadata>();
85+
for (Queue queue : queues) {
86+
metadataList.add(new QueueMetadata(queue));
87+
}
88+
return metadataList;
6389
}
6490

6591
@POST
66-
@Path("/recovery")
92+
@Path("/queues")
93+
@Consumes(MediaType.APPLICATION_JSON)
6794
@Produces(MediaType.APPLICATION_JSON)
68-
public List<String> recovery() {
69-
70-
return new ArrayList<>();
95+
public Response listQueuedWorkflow() {
96+
// TODO: implement dbosExec.listQueuedWorkflows
97+
return Response.serverError().build();
7198
}
7299

73100
@POST
74101
@Path("/workflows")
75102
@Consumes(MediaType.APPLICATION_JSON)
76103
@Produces(MediaType.APPLICATION_JSON)
77-
public List<WorkflowStatus> workflows(ListWorkflowsInput input) {
104+
public List<WorkflowStatus> listWorkflows(ListWorkflowsInput input) {
105+
if (input == null) {
106+
input = new ListWorkflowsInput();
107+
}
108+
109+
try {
110+
return systemDatabase.listWorkflows(input);
111+
} catch (SQLException e) {
112+
logger.error("Error listing workflows {}", e.getMessage());
113+
throw new WebApplicationException(e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR);
114+
}
115+
}
116+
117+
@GET
118+
@Path("/workflows/{workflowId}")
119+
@Produces(MediaType.APPLICATION_JSON)
120+
public WorkflowStatus GetWorkflowStatus(@PathParam("workflowId") String workflowId) {
121+
logger.info("Get workflow status for workflow {}", workflowId);
122+
return systemDatabase.getWorkflowStatus(workflowId);
123+
}
78124

79-
return new ArrayList<>();
125+
@GET
126+
@Path("/workflows/{workflowId}/steps")
127+
@Produces(MediaType.APPLICATION_JSON)
128+
public List<StepInfo> ListSteps(@PathParam("workflowId") String workflowId) {
129+
logger.info("Retrieving steps for workflow {}", workflowId);
130+
return systemDatabase.listWorkflowSteps(workflowId);
80131
}
81132

82133
@POST
83134
@Path("/workflows/{workflowId}/restart")
84135
@Produces(MediaType.APPLICATION_JSON)
85-
public void restart(@PathParam("workflowId") String workflowId) {
136+
public ForkWorkflowResponse restart(@PathParam("workflowId") String workflowId) {
137+
logger.info("Restarting workflow {} with a new ID", workflowId);
138+
WorkflowHandle<?> handle = dbosExecutor.forkWorkflow(workflowId, 0, null);
139+
return new ForkWorkflowResponse(handle.getWorkflowId());
86140
}
87141

88142
@POST
89143
@Path("/workflows/{workflowId}/resume")
90144
@Produces(MediaType.APPLICATION_JSON)
91-
public void resume(@PathParam("workflowId") String workflowId) {
145+
public Response resume(@PathParam("workflowId") String workflowId) {
146+
logger.info("Resuming workflow {}", workflowId);
147+
dbosExecutor.resumeWorkflow(workflowId);
148+
return Response.noContent().build();
92149
}
93150

94151
@POST
95152
@Path("/workflows/{workflowId}/fork")
153+
@Consumes(MediaType.APPLICATION_JSON)
96154
@Produces(MediaType.APPLICATION_JSON)
97-
public void fork(@PathParam("workflowId") String workflowId) {
155+
public ForkWorkflowResponse fork(@PathParam("workflowId") String workflowId, ForkWorkflowRequest request) {
156+
if (request == null) {
157+
request = new ForkWorkflowRequest();
158+
}
159+
int startStep = (request.startStep != null) ? request.startStep : 0;
160+
logger.info("Forking workflow {} from step {} with a new ID", workflowId, startStep);
161+
162+
ForkOptions.Builder builder = ForkOptions.builder();
163+
if (request.newWorkflowId != null) {
164+
builder.forkedWorkflowId(request.newWorkflowId);
165+
}
166+
if (request.applicationVersion != null) {
167+
builder.applicationVersion(request.applicationVersion);
168+
}
169+
if (request.timeoutMs != null) {
170+
builder.timeoutMS(request.timeoutMs);
171+
}
172+
173+
WorkflowHandle<?> handle = dbosExecutor.forkWorkflow(workflowId, startStep, builder.build());
174+
return new ForkWorkflowResponse(handle.getWorkflowId());
98175
}
99176

100177
@POST
101178
@Path("/workflows/{workflowId}/cancel")
102-
@Produces(MediaType.APPLICATION_JSON)
103-
public void cancel(@PathParam("workflowId") String workflowId) {
179+
public Response cancel(@PathParam("workflowId") String workflowId) {
180+
logger.info("Cancel workflow {}", workflowId);
181+
dbosExecutor.cancelWorkflow(workflowId);
182+
return Response.noContent().build();
183+
}
184+
185+
public static class ForkWorkflowRequest {
186+
public Integer startStep;
187+
public String newWorkflowId;
188+
public String applicationVersion;
189+
public Long timeoutMs;
190+
191+
public ForkWorkflowRequest() {
192+
}
193+
}
194+
195+
public static class ForkWorkflowResponse {
196+
public String workflowId;
197+
198+
public ForkWorkflowResponse(String workflowId) {
199+
this.workflowId = workflowId;
200+
}
104201
}
105202
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package dev.dbos.transact.queue;
2+
3+
public class QueueMetadata {
4+
public String name;
5+
public Integer concurrency;
6+
public Integer workerConcurrency;
7+
public RateLimitMetadata rateLimit;
8+
public Boolean priorityEnabled;
9+
10+
public QueueMetadata(Queue queue) {
11+
this.name = queue.getName();
12+
this.concurrency = queue.getConcurrency();
13+
this.workerConcurrency = queue.getWorkerConcurrency();
14+
if (queue.getRateLimit() != null) {
15+
this.rateLimit = new RateLimitMetadata(queue.getRateLimit());
16+
} else {
17+
this.rateLimit = null;
18+
}
19+
this.priorityEnabled = queue.isPriorityEnabled();
20+
}
21+
22+
public static class RateLimitMetadata {
23+
public Integer limit;
24+
public Double period;
25+
26+
public RateLimitMetadata(dev.dbos.transact.queue.RateLimit rateLimit) {
27+
this.limit = rateLimit.getLimit();
28+
this.period = rateLimit.getPeriod();
29+
}
30+
}
31+
}

src/main/java/dev/dbos/transact/queue/QueueService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,4 +157,8 @@ public synchronized boolean isStopped() {
157157
// We also check !workerThread.isAlive() as a final confirmation.
158158
return shutdownLatch != null && shutdownLatch.getCount() == 0 && !workerThread.isAlive();
159159
}
160+
161+
public List<Queue> getAllQueuesSnapshot() {
162+
return queueRegistry.getAllQueuesSnapshot();
163+
}
160164
}

0 commit comments

Comments
 (0)