Skip to content

Commit bb051bc

Browse files
authored
non public method support + more tests (#246)
* enable invocation of non public methods in `DBOSInvocationHandler` (fixes #108) * Use Timeout enum in `ForkOptions.timeout` instead of Duration * class -> record * GetWorkflowEventContext * WorkflowInitResult * GetPendingWorkflowsOutput * updated workflow and events tests to have non public service interface/implementation in same file as tests fixes #237 (was already implemented but added tests) fixes #238 (was already implemented but added tests)
1 parent e4dd1f3 commit bb051bc

29 files changed

Lines changed: 1202 additions & 1143 deletions

transact/src/main/java/dev/dbos/transact/DBOS.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -617,6 +617,21 @@ public static <T, E extends Exception> WorkflowHandle<T, E> forkWorkflow(
617617
return executor("forkWorkflow").forkWorkflow(workflowId, startStep, options);
618618
}
619619

620+
/**
621+
* Fork the workflow. Re-execute with another Id from the step provided. Steps prior to the
622+
* provided step are copied over
623+
*
624+
* @param <T> Return type of the workflow function
625+
* @param <E> Checked exception thrown by the workflow function, if any
626+
* @param workflowId Original workflow Id
627+
* @param startStep Start execution from this step. Prior steps copied over
628+
* @return handle to the workflow
629+
*/
630+
public static <T, E extends Exception> WorkflowHandle<T, E> forkWorkflow(
631+
String workflowId, int startStep) {
632+
return forkWorkflow(workflowId, startStep, new ForkOptions());
633+
}
634+
620635
/**
621636
* Retrieve a handle to a workflow, given its ID. Note that a handle is always returned, whether
622637
* the workflow exists or not; getStatus() can be used to tell the difference

transact/src/main/java/dev/dbos/transact/StartWorkflowOptions.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -105,14 +105,7 @@ public StartWorkflowOptions withTimeout(long value, TimeUnit unit) {
105105

106106
/** Produces a new StartWorkflowOptions that removes the timeout behavior */
107107
public StartWorkflowOptions withNoTimeout() {
108-
return new StartWorkflowOptions(
109-
this.workflowId,
110-
Timeout.none(),
111-
this.deadline,
112-
this.queueName,
113-
this.deduplicationId,
114-
this.priority,
115-
this.queuePartitionKey);
108+
return withTimeout(Timeout.none());
116109
}
117110

118111
/** Produces a new StartWorkflowOptions that overrides deadline value for the started workflow */

transact/src/main/java/dev/dbos/transact/admin/AdminServer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,8 +198,8 @@ private void listWorkflows(HttpExchange exchange) throws IOException {
198198
private void listQueuedWorkflows(HttpExchange exchange) throws IOException {
199199
if (!ensurePostJson(exchange)) return;
200200

201-
var request = mapper.readValue(exchange.getRequestBody(), ListWorkflowsRequest.class);
202-
var input = request.asInput().withQueuesOnly();
201+
var request = mapper.readValue(exchange.getRequestBody(), ListQueuedWorkflowsRequest.class);
202+
var input = request.asInput();
203203
var workflows = systemDatabase.listWorkflows(input);
204204
var response = workflows.stream().map(WorkflowsOutput::of).collect(Collectors.toList());
205205
sendMappedJson(exchange, 200, response);
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package dev.dbos.transact.admin;
2+
3+
import dev.dbos.transact.workflow.ListWorkflowsInput;
4+
5+
import java.time.OffsetDateTime;
6+
import java.util.List;
7+
8+
public record ListQueuedWorkflowsRequest(
9+
String workflow_name,
10+
String start_time,
11+
String end_time,
12+
String status,
13+
String fork_from,
14+
String queue_name,
15+
Integer limit,
16+
Integer offset,
17+
Boolean sort_desc,
18+
Boolean load_input) {
19+
20+
public ListWorkflowsInput asInput() {
21+
return new ListWorkflowsInput(
22+
null, // workflow ids
23+
status != null ? List.of(status) : null,
24+
start_time != null ? OffsetDateTime.parse(start_time) : null,
25+
end_time != null ? OffsetDateTime.parse(end_time) : null,
26+
workflow_name,
27+
null, // class_name,
28+
null, // instance_name
29+
null, // app version
30+
null, // auth user
31+
limit,
32+
offset,
33+
sort_desc,
34+
null, // wf id prefix
35+
load_input,
36+
false, // load output
37+
queue_name,
38+
true, // queuesOnly: only list queued workflows
39+
null, // Executor IDs
40+
fork_from);
41+
}
42+
}

transact/src/main/java/dev/dbos/transact/admin/ListWorkflowsRequest.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,19 @@
77

88
public record ListWorkflowsRequest(
99
List<String> workflow_uuids,
10+
String workflow_name,
1011
String authenticated_user,
1112
String start_time,
1213
String end_time,
1314
String status,
1415
String application_version,
15-
String workflow_name,
16+
String fork_from,
1617
Integer limit,
1718
Integer offset,
1819
Boolean sort_desc,
1920
String workflow_id_prefix,
2021
Boolean load_input,
21-
Boolean load_output,
22-
String queue_name) {
22+
Boolean load_output) {
2323

2424
public ListWorkflowsInput asInput() {
2525
return new ListWorkflowsInput(
@@ -38,10 +38,9 @@ public ListWorkflowsInput asInput() {
3838
workflow_id_prefix,
3939
load_input,
4040
load_output,
41-
queue_name,
42-
queue_name != null ? true : false,
41+
null,
42+
false,
4343
null, // Executor IDs
44-
null // forkedFrom
45-
);
44+
fork_from);
4645
}
4746
}
Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,3 @@
11
package dev.dbos.transact.database;
22

3-
public class GetWorkflowEventContext {
4-
5-
private String workflowId;
6-
private int functionId;
7-
private int timeoutFunctionId;
8-
9-
public GetWorkflowEventContext(String workflowId, int functionId, int timeoutFunctionId) {
10-
this.workflowId = workflowId;
11-
this.functionId = functionId;
12-
this.timeoutFunctionId = timeoutFunctionId;
13-
}
14-
15-
public String getWorkflowId() {
16-
return workflowId;
17-
}
18-
19-
public int getFunctionId() {
20-
return functionId;
21-
}
22-
23-
public int getTimeoutFunctionId() {
24-
return timeoutFunctionId;
25-
}
26-
}
3+
public record GetWorkflowEventContext(String workflowId, int functionId, int timeoutFunctionId) {}

transact/src/main/java/dev/dbos/transact/database/NotificationsDAO.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -362,23 +362,19 @@ Object getEvent(
362362
try (Connection conn = dataSource.getConnection()) {
363363
recordedOutput =
364364
StepsDAO.checkStepExecutionTxn(
365-
callerCtx.getWorkflowId(),
366-
callerCtx.getFunctionId(),
367-
functionName,
368-
conn,
369-
this.schema);
365+
callerCtx.workflowId(), callerCtx.functionId(), functionName, conn, this.schema);
370366
}
371367

372368
if (recordedOutput != null) {
373-
logger.debug("Replaying getEvent, id: {}, key: {}", callerCtx.getFunctionId(), key);
369+
logger.debug("Replaying getEvent, id: {}, key: {}", callerCtx.functionId(), key);
374370
if (recordedOutput.output() != null) {
375371
Object[] outputArray = JSONUtil.deserializeToArray(recordedOutput.output());
376372
return outputArray == null ? null : outputArray[0];
377373
} else {
378374
throw new RuntimeException("No output recorded in the last getEvent");
379375
}
380376
} else {
381-
logger.debug("Running getEvent, id: {}, key: {}", callerCtx.getFunctionId(), key);
377+
logger.debug("Running getEvent, id: {}, key: {}", callerCtx.functionId(), key);
382378
}
383379
}
384380

@@ -398,7 +394,8 @@ Object getEvent(
398394
.formatted(this.schema);
399395

400396
// Wait for the notification
401-
double actualTimeout = timeout.toMillis();
397+
double actualTimeout =
398+
Objects.requireNonNull(timeout, "getEvent timeout cannot be null").toMillis();
402399
var targetTime = System.currentTimeMillis() + actualTimeout;
403400
var checkedDBForSleep = false;
404401
var hasExistingNotification = false;
@@ -431,8 +428,8 @@ Object getEvent(
431428
actualTimeout =
432429
StepsDAO.sleep(
433430
dataSource,
434-
callerCtx.getWorkflowId(),
435-
callerCtx.getTimeoutFunctionId(),
431+
callerCtx.workflowId(),
432+
callerCtx.timeoutFunctionId(),
436433
timeout,
437434
true, // skip_sleep
438435
this.schema)
@@ -456,7 +453,7 @@ Object getEvent(
456453
// Record the output if it's in a workflow
457454
if (callerCtx != null) {
458455
StepResult output =
459-
new StepResult(callerCtx.getWorkflowId(), callerCtx.getFunctionId(), functionName)
456+
new StepResult(callerCtx.workflowId(), callerCtx.functionId(), functionName)
460457
.withOutput(JSONUtil.serialize(value));
461458
StepsDAO.recordStepResultTxn(
462459
dataSource, output, startTime, System.currentTimeMillis(), this.schema);

transact/src/main/java/dev/dbos/transact/database/WorkflowDAO.java

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@
77
import dev.dbos.transact.workflow.ErrorResult;
88
import dev.dbos.transact.workflow.ForkOptions;
99
import dev.dbos.transact.workflow.ListWorkflowsInput;
10+
import dev.dbos.transact.workflow.Timeout;
1011
import dev.dbos.transact.workflow.WorkflowState;
1112
import dev.dbos.transact.workflow.WorkflowStatus;
1213
import dev.dbos.transact.workflow.internal.GetPendingWorkflowsOutput;
1314
import dev.dbos.transact.workflow.internal.StepResult;
1415
import dev.dbos.transact.workflow.internal.WorkflowStatusInternal;
1516

1617
import java.sql.*;
17-
import java.time.Duration;
1818
import java.time.Instant;
1919
import java.util.*;
2020

@@ -362,7 +362,7 @@ List<WorkflowStatus> listWorkflows(ListWorkflowsInput input) throws SQLException
362362
sqlBuilder.append(
363363
"""
364364
SELECT
365-
workflow_uuid, status, forked_from,
365+
workflow_uuid, status, forked_from,
366366
name, class_name, config_name,
367367
queue_name, deduplication_id, priority, queue_partition_key,
368368
executor_id, application_version, application_id,
@@ -787,6 +787,8 @@ String forkWorkflow(String originalWorkflowId, int startStep, ForkOptions option
787787
throw new IllegalStateException("Database is closed!");
788788
}
789789

790+
Objects.requireNonNull(options);
791+
790792
var status = getWorkflowStatus(originalWorkflowId);
791793
if (status == null) {
792794
throw new DBOSNonExistentWorkflowException(originalWorkflowId);
@@ -801,12 +803,12 @@ String forkWorkflow(String originalWorkflowId, int startStep, ForkOptions option
801803

802804
String applicationVersion = options.applicationVersion();
803805

804-
var timeout = options.timeout();
805-
if (timeout == null) {
806-
timeout = status.getTimeout();
807-
}
808-
if (timeout == null) {
809-
timeout = Duration.ZERO;
806+
var timeout = Objects.requireNonNullElse(options.timeout(), Timeout.inherit());
807+
Long timeoutMS = null;
808+
if (timeout instanceof Timeout.Inherit) {
809+
timeoutMS = status.timeoutMs();
810+
} else if (timeout instanceof Timeout.Explicit explicit) {
811+
timeoutMS = explicit.value().toMillis();
810812
}
811813

812814
try (Connection connection = dataSource.getConnection()) {
@@ -820,7 +822,7 @@ String forkWorkflow(String originalWorkflowId, int startStep, ForkOptions option
820822
forkedWorkflowId,
821823
status,
822824
applicationVersion,
823-
timeout.toMillis(),
825+
timeoutMS,
824826
this.schema);
825827

826828
// Copy operation outputs if starting from step > 0
@@ -845,21 +847,17 @@ private static void insertForkedWorkflowStatus(
845847
String forkedWorkflowId,
846848
WorkflowStatus originalStatus,
847849
String applicationVersion,
848-
long timeoutMs,
850+
Long timeoutMS,
849851
String schema)
850852
throws SQLException {
851853
Objects.requireNonNull(schema);
852-
long workflowDeadlineEpoch = 0;
853-
if (timeoutMs > 0) {
854-
workflowDeadlineEpoch = System.currentTimeMillis() + timeoutMs;
855-
}
856854

857855
String sql =
858856
"""
859857
INSERT INTO %s.workflow_status (
860858
workflow_uuid, status, name, class_name, config_name, application_version, application_id,
861-
authenticated_user, authenticated_roles, assumed_role, queue_name, inputs, workflow_deadline_epoch_ms, workflow_timeout_ms, forked_from
862-
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
859+
authenticated_user, authenticated_roles, assumed_role, queue_name, inputs, workflow_timeout_ms, forked_from
860+
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
863861
"""
864862
.formatted(schema);
865863

@@ -876,9 +874,8 @@ private static void insertForkedWorkflowStatus(
876874
stmt.setString(10, originalStatus.assumedRole());
877875
stmt.setString(11, Constants.DBOS_INTERNAL_QUEUE);
878876
stmt.setString(12, JSONUtil.serializeArray(originalStatus.input()));
879-
stmt.setLong(13, workflowDeadlineEpoch);
880-
stmt.setObject(14, originalStatus.timeoutMs());
881-
stmt.setString(15, originalWorkflowId);
877+
stmt.setObject(13, timeoutMS);
878+
stmt.setString(14, originalWorkflowId);
882879

883880
stmt.executeUpdate();
884881
}

transact/src/main/java/dev/dbos/transact/database/WorkflowInitResult.java

Lines changed: 3 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,9 @@
22

33
import java.util.Objects;
44

5-
public class WorkflowInitResult {
6-
private String workflowId;
7-
private String status;
8-
private Long deadlineEpochMS; // Use Long for nullable number
9-
private boolean shouldExecuteOnThisExecutor;
10-
11-
public WorkflowInitResult(String id, String status, Long deadlineEpochMS, boolean shouldExecute) {
12-
this.workflowId = id;
13-
this.status = status;
14-
this.deadlineEpochMS = deadlineEpochMS;
15-
this.shouldExecuteOnThisExecutor = shouldExecute;
16-
}
17-
18-
public String getStatus() {
19-
return status;
20-
}
21-
22-
public Long getDeadlineEpochMS() {
5+
public record WorkflowInitResult(
6+
String workflowId, String status, Long deadlineEpochMS, boolean shouldExecuteOnThisExecutor) {
7+
public Long deadlineEpochMS() {
238
return Objects.requireNonNullElse(deadlineEpochMS, 0L);
249
}
25-
26-
public String getWorkflowId() {
27-
return workflowId;
28-
}
29-
30-
public boolean shouldExecuteOnThisExecutor() {
31-
return shouldExecuteOnThisExecutor;
32-
}
3310
}

0 commit comments

Comments
 (0)