Skip to content

Commit ae8d1ff

Browse files
authored
Executor Info Conductor Support (#32)
1 parent f348853 commit ae8d1ff

5 files changed

Lines changed: 64 additions & 10 deletions

File tree

src/main/java/dev/dbos/transact/conductor/Conductor.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import dev.dbos.transact.workflow.WorkflowStatus;
1313
import dev.dbos.transact.workflow.internal.GetPendingWorkflowsOutput;
1414

15+
import java.net.InetAddress;
1516
import java.net.URI;
1617
import java.net.http.HttpClient;
1718
import java.net.http.WebSocket;
@@ -341,9 +342,13 @@ BaseResponse getResponse(BaseMessage message) {
341342
}
342343

343344
static BaseResponse handleExecutorInfo(Conductor conductor, BaseMessage message) {
344-
// TODO: real implementation
345-
ExecutorInfoRequest request = (ExecutorInfoRequest) message;
346-
return new ExecutorInfoResponse(request, new RuntimeException("not yet implemented"));
345+
try {
346+
String hostname = InetAddress.getLocalHost().getHostName();
347+
return new ExecutorInfoResponse(message, conductor.dbosExecutor.getExecutorId(),
348+
conductor.dbosExecutor.getAppVersion(), hostname);
349+
} catch (Exception e) {
350+
return new ExecutorInfoResponse(message, e);
351+
}
347352
}
348353

349354
static BaseResponse handleRecovery(Conductor conductor, BaseMessage message) {

src/main/java/dev/dbos/transact/conductor/protocol/ExecutorInfoRequest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,12 @@
22

33
public class ExecutorInfoRequest extends BaseMessage {
44
// empty on purpose
5+
6+
public ExecutorInfoRequest() {
7+
}
8+
9+
public ExecutorInfoRequest(String requestId) {
10+
this.type = MessageType.EXECUTOR_INFO.getValue();
11+
this.request_id = requestId;
12+
}
513
}

src/main/java/dev/dbos/transact/conductor/protocol/ListQueuedWorkflowsRequest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,11 @@ public ListQueuedWorkflowsRequest build(String requestId) {
8787
body.start_time = this.start_time;
8888
body.end_time = this.end_time;
8989
body.status = this.status;
90-
body.queue_name= this.queue_name;
91-
body.limit= this.limit;
92-
body.offset= this.offset;
93-
body.sort_desc= this.sort_desc;
94-
body.load_input= this.load_input;
90+
body.queue_name = this.queue_name;
91+
body.limit = this.limit;
92+
body.offset = this.offset;
93+
body.sort_desc = this.sort_desc;
94+
body.load_input = this.load_input;
9595
request.body = body;
9696
return request;
9797
}

src/main/java/dev/dbos/transact/execution/DBOSExecutor.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,14 @@ public String getAppName() {
5959
return config.getName();
6060
}
6161

62+
public String getExecutorId() {
63+
return GlobalParams.getInstance().getExecutorId();
64+
}
65+
66+
public String getAppVersion() {
67+
return GlobalParams.getInstance().getAppVersion();
68+
}
69+
6270
public void setQueueService(QueueService queueService) {
6371
this.queueService = queueService;
6472
}

src/test/java/dev/dbos/transact/conductor/ConductorTests.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static org.junit.jupiter.api.Assertions.*;
44
import static org.mockito.ArgumentMatchers.any;
5+
import static org.mockito.ArgumentMatchers.anyBoolean;
56
import static org.mockito.ArgumentMatchers.anyInt;
67
import static org.mockito.ArgumentMatchers.anyString;
78
import static org.mockito.ArgumentMatchers.eq;
@@ -10,6 +11,7 @@
1011
import dev.dbos.transact.conductor.TestWebSocketServer.WebSocketTestListener;
1112
import dev.dbos.transact.conductor.protocol.BaseMessage;
1213
import dev.dbos.transact.conductor.protocol.CancelRequest;
14+
import dev.dbos.transact.conductor.protocol.ExecutorInfoRequest;
1315
import dev.dbos.transact.conductor.protocol.ExistPendingWorkflowsRequest;
1416
import dev.dbos.transact.conductor.protocol.ForkWorkflowRequest;
1517
import dev.dbos.transact.conductor.protocol.GetWorkflowRequest;
@@ -29,6 +31,7 @@
2931
import dev.dbos.transact.workflow.WorkflowStatus;
3032
import dev.dbos.transact.workflow.internal.GetPendingWorkflowsOutput;
3133

34+
import java.net.InetAddress;
3235
import java.time.OffsetDateTime;
3336
import java.util.ArrayList;
3437
import java.util.List;
@@ -237,6 +240,36 @@ public void send(BaseMessage message) throws Exception {
237240
}
238241
}
239242

243+
@Test
244+
public void canExecutorInfo() throws Exception {
245+
MessageListener listener = new MessageListener();
246+
testServer.setListener(listener);
247+
248+
String hostname = InetAddress.getLocalHost().getHostName();
249+
250+
when(mockExec.getAppVersion()).thenReturn("test-app-version");
251+
when(mockExec.getExecutorId()).thenReturn("test-executor-id");
252+
253+
try (Conductor conductor = builder.build()) {
254+
conductor.start();
255+
assertTrue(listener.openLatch.await(5, TimeUnit.SECONDS), "open latch timed out");
256+
257+
ExecutorInfoRequest req = new ExecutorInfoRequest("12345");
258+
listener.send(req);
259+
assertTrue(listener.messageLatch.await(1000000000, TimeUnit.SECONDS), "message latch timed out");
260+
261+
JsonNode jsonNode = mapper.readTree(listener.message);
262+
assertNotNull(jsonNode);
263+
assertEquals("executor_info", jsonNode.get("type").asText());
264+
assertEquals("12345", jsonNode.get("request_id").asText());
265+
assertEquals(hostname, jsonNode.get("hostname").asText());
266+
assertEquals("test-app-version", jsonNode.get("application_version").asText());
267+
assertEquals("test-executor-id", jsonNode.get("executor_id").asText());
268+
assertNull(jsonNode.get("error_message"));
269+
270+
}
271+
}
272+
240273
@Test
241274
public void canCancel() throws Exception {
242275
MessageListener listener = new MessageListener();
@@ -564,7 +597,8 @@ public void canListQueuedWorkflows() throws Exception {
564597
.build("12345");
565598
listener.send(req);
566599
assertTrue(listener.messageLatch.await(100000000, TimeUnit.SECONDS), "message latch timed out");
567-
ArgumentCaptor<ListQueuedWorkflowsInput> inputCaptor = ArgumentCaptor.forClass(ListQueuedWorkflowsInput.class);
600+
ArgumentCaptor<ListQueuedWorkflowsInput> inputCaptor = ArgumentCaptor
601+
.forClass(ListQueuedWorkflowsInput.class);
568602
verify(mockDB).getQueuedWorkflows(inputCaptor.capture(), eq(false));
569603
ListQueuedWorkflowsInput input = inputCaptor.getValue();
570604
assertEquals(OffsetDateTime.parse("2024-06-01T12:34:56Z"), input.getStartTime());
@@ -585,7 +619,6 @@ public void canListQueuedWorkflows() throws Exception {
585619
}
586620
}
587621

588-
589622
@Test
590623
public void canGetWorkflow() throws Exception {
591624
MessageListener listener = new MessageListener();

0 commit comments

Comments
 (0)