Skip to content

Commit fc4c8f3

Browse files
authored
Single Execution (#236)
Use database to limit workflows to one executor globally. Use memory map to limit to one execution locally. Close commit holes in WF status and step outputs. Test infrastructure and tests. Fixes #226
1 parent ca597f1 commit fc4c8f3

22 files changed

Lines changed: 887 additions & 150 deletions

transact/src/main/java/dev/dbos/transact/DBOSClient.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,9 @@ public <T, E extends Exception> WorkflowHandle<T, E> enqueueWorkflow(
355355
Objects.requireNonNull(
356356
options.queueName(), "EnqueueOptions queueName must not be null"),
357357
options.deduplicationId,
358-
options.priority),
358+
options.priority,
359+
false,
360+
false),
359361
null,
360362
null,
361363
options.appVersion,
@@ -379,7 +381,7 @@ public void send(String destinationId, Object message, String topic, String idem
379381
var status =
380382
new WorkflowStatusInternal(workflowId, WorkflowState.SUCCESS)
381383
.withName("temp_workflow-send-client");
382-
systemDatabase.initWorkflowStatus(status, null);
384+
systemDatabase.initWorkflowStatus(status, null, false, false);
383385
systemDatabase.send(status.workflowId(), 0, destinationId, message, topic);
384386
}
385387

transact/src/main/java/dev/dbos/transact/database/NotificationsDAO.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import org.slf4j.Logger;
1919
import org.slf4j.LoggerFactory;
2020

21-
public class NotificationsDAO {
21+
class NotificationsDAO {
2222

2323
private static final Logger logger = LoggerFactory.getLogger(NotificationsDAO.class);
2424

@@ -37,7 +37,7 @@ void speedUpPollingForTest() {
3737
dbPollingIntervalEventMs = 100;
3838
}
3939

40-
public void send(
40+
void send(
4141
String workflowUuid, int functionId, String destinationUuid, Object message, String topic)
4242
throws SQLException {
4343

@@ -96,7 +96,8 @@ public void send(
9696

9797
// Record operation result
9898
var output = new StepResult(workflowUuid, functionId, functionName);
99-
StepsDAO.recordStepResultTxn(output, startTime, conn, this.schema);
99+
StepsDAO.recordStepResultTxn(
100+
output, startTime, System.currentTimeMillis(), conn, this.schema);
100101

101102
conn.commit();
102103

@@ -111,7 +112,7 @@ public void send(
111112
}
112113
}
113114

114-
public Object recv(
115+
Object recv(
115116
String workflowUuid, int functionId, int timeoutFunctionId, String topic, Duration timeout)
116117
throws SQLException, InterruptedException {
117118

@@ -245,7 +246,8 @@ WITH oldest_entry AS (
245246
StepResult output =
246247
new StepResult(workflowUuid, functionId, functionName)
247248
.withOutput(JSONUtil.serialize(toSave));
248-
StepsDAO.recordStepResultTxn(output, startTime, conn, this.schema);
249+
StepsDAO.recordStepResultTxn(
250+
output, startTime, System.currentTimeMillis(), conn, this.schema);
249251

250252
conn.commit();
251253
return toSave;
@@ -294,8 +296,7 @@ ON CONFLICT (workflow_uuid, key, function_id)
294296
}
295297
}
296298

297-
public void setEvent(
298-
String workflowId, int functionId, String key, Object message, boolean asStep)
299+
void setEvent(String workflowId, int functionId, String key, Object message, boolean asStep)
299300
throws SQLException {
300301
if (dataSource.isClosed()) {
301302
throw new IllegalStateException("Database is closed!");
@@ -329,7 +330,8 @@ public void setEvent(
329330
if (asStep) {
330331
// Record the operation result
331332
StepResult output = new StepResult(workflowId, functionId, functionName);
332-
StepsDAO.recordStepResultTxn(output, startTime, conn, this.schema);
333+
StepsDAO.recordStepResultTxn(
334+
output, startTime, System.currentTimeMillis(), conn, this.schema);
333335
}
334336

335337
conn.commit();
@@ -342,7 +344,7 @@ public void setEvent(
342344
}
343345
}
344346

345-
public Object getEvent(
347+
Object getEvent(
346348
String targetUuid, String key, Duration timeout, GetWorkflowEventContext callerCtx)
347349
throws SQLException {
348350
if (dataSource.isClosed()) {
@@ -456,7 +458,8 @@ public Object getEvent(
456458
StepResult output =
457459
new StepResult(callerCtx.getWorkflowId(), callerCtx.getFunctionId(), functionName)
458460
.withOutput(JSONUtil.serialize(value));
459-
StepsDAO.recordStepResultTxn(dataSource, output, startTime, this.schema);
461+
StepsDAO.recordStepResultTxn(
462+
dataSource, output, startTime, System.currentTimeMillis(), this.schema);
460463
}
461464

462465
return value;

transact/src/main/java/dev/dbos/transact/database/QueuesDAO.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import org.slf4j.Logger;
1515
import org.slf4j.LoggerFactory;
1616

17-
public class QueuesDAO {
17+
class QueuesDAO {
1818
private static final Logger logger = LoggerFactory.getLogger(QueuesDAO.class);
1919

2020
private final HikariDataSource dataSource;
@@ -33,7 +33,7 @@ public class QueuesDAO {
3333
* @param appVersion The application version
3434
* @return List of workflow UUIDs that are due for execution
3535
*/
36-
public List<String> getAndStartQueuedWorkflows(Queue queue, String executorId, String appVersion)
36+
List<String> getAndStartQueuedWorkflows(Queue queue, String executorId, String appVersion)
3737
throws SQLException {
3838
if (dataSource.isClosed()) {
3939
throw new IllegalStateException("Database is closed!");
@@ -236,7 +236,7 @@ SELECT executor_id, COUNT(*) as task_count
236236
}
237237
}
238238

239-
public boolean clearQueueAssignment(String workflowId) throws SQLException {
239+
boolean clearQueueAssignment(String workflowId) throws SQLException {
240240
if (dataSource.isClosed()) {
241241
throw new IllegalStateException("Database is closed!");
242242
}

transact/src/main/java/dev/dbos/transact/database/StepsDAO.java

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package dev.dbos.transact.database;
22

33
import dev.dbos.transact.exceptions.*;
4+
import dev.dbos.transact.internal.DebugTriggers;
45
import dev.dbos.transact.json.JSONUtil;
56
import dev.dbos.transact.workflow.ErrorResult;
67
import dev.dbos.transact.workflow.StepInfo;
@@ -17,7 +18,7 @@
1718
import org.slf4j.Logger;
1819
import org.slf4j.LoggerFactory;
1920

20-
public class StepsDAO {
21+
class StepsDAO {
2122

2223
private static final Logger logger = LoggerFactory.getLogger(StepsDAO.class);
2324

@@ -29,20 +30,29 @@ public class StepsDAO {
2930
this.schema = Objects.requireNonNull(schema);
3031
}
3132

32-
public static void recordStepResultTxn(
33-
HikariDataSource dataSource, StepResult result, long startTimeEpochMs, String schema)
33+
static void recordStepResultTxn(
34+
HikariDataSource dataSource,
35+
StepResult result,
36+
long startTimeEpochMs,
37+
long endTimeEpochMs,
38+
String schema)
3439
throws SQLException {
3540
if (dataSource.isClosed()) {
3641
throw new IllegalStateException("Database is closed!");
3742
}
3843

3944
try (Connection connection = dataSource.getConnection(); ) {
40-
recordStepResultTxn(result, startTimeEpochMs, connection, schema);
45+
recordStepResultTxn(result, startTimeEpochMs, endTimeEpochMs, connection, schema);
4146
}
47+
DebugTriggers.debugTriggerPoint(DebugTriggers.DEBUG_TRIGGER_STEP_COMMIT);
4248
}
4349

44-
public static void recordStepResultTxn(
45-
StepResult result, Long startTimeEpochMs, Connection connection, String schema)
50+
static void recordStepResultTxn(
51+
StepResult result,
52+
Long startTimeEpochMs,
53+
Long endTimeEpochMs,
54+
Connection connection,
55+
String schema)
4656
throws SQLException {
4757

4858
Objects.requireNonNull(schema);
@@ -51,6 +61,7 @@ public static void recordStepResultTxn(
5161
INSERT INTO %s.operation_outputs
5262
(workflow_uuid, function_id, function_name, output, error, child_workflow_id, started_at_epoch_ms, completed_at_epoch_ms)
5363
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
64+
ON CONFLICT DO NOTHING RETURNING completed_at_epoch_ms
5465
"""
5566
.formatted(schema);
5667

@@ -78,11 +89,20 @@ public static void recordStepResultTxn(
7889
}
7990

8091
pstmt.setObject(7, startTimeEpochMs);
81-
Long endTime = startTimeEpochMs == null ? null : System.currentTimeMillis();
82-
pstmt.setObject(8, endTime);
83-
84-
pstmt.executeUpdate();
92+
pstmt.setObject(8, endTimeEpochMs);
8593

94+
try (ResultSet rs = pstmt.executeQuery()) {
95+
if (rs.next() && endTimeEpochMs != null) {
96+
long completedAt = rs.getLong("completed_at_epoch_ms");
97+
logger.warn(
98+
String.format(
99+
"Step output for %s:%d-%s was already recorded",
100+
result.workflowId(), result.stepId(), result.functionName()));
101+
if (completedAt != endTimeEpochMs) {
102+
throw new DBOSWorkflowExecutionConflictException(result.workflowId());
103+
}
104+
}
105+
}
86106
} catch (SQLException e) {
87107
logger.debug("recordStepResultTxn error", e);
88108
if ("23505".equals(e.getSQLState())) {
@@ -108,7 +128,7 @@ public static void recordStepResultTxn(
108128
* match the provided name.
109129
* @throws SQLException For other database access errors.
110130
*/
111-
public static StepResult checkStepExecutionTxn(
131+
static StepResult checkStepExecutionTxn(
112132
String workflowId, int functionId, String functionName, Connection connection, String schema)
113133
throws SQLException, DBOSWorkflowCancelledException, DBOSUnexpectedStepException {
114134

@@ -175,7 +195,7 @@ public static StepResult checkStepExecutionTxn(
175195
return recordedResult;
176196
}
177197

178-
public List<StepInfo> listWorkflowSteps(String workflowId) throws SQLException {
198+
List<StepInfo> listWorkflowSteps(String workflowId) throws SQLException {
179199

180200
if (dataSource.isClosed()) {
181201
throw new IllegalStateException("Database is closed!");
@@ -250,12 +270,12 @@ public List<StepInfo> listWorkflowSteps(String workflowId) throws SQLException {
250270
return steps;
251271
}
252272

253-
public Duration sleep(String workflowUuid, int functionId, Duration duration, boolean skipSleep)
273+
Duration sleep(String workflowUuid, int functionId, Duration duration, boolean skipSleep)
254274
throws SQLException {
255275
return StepsDAO.sleep(dataSource, workflowUuid, functionId, duration, skipSleep, this.schema);
256276
}
257277

258-
public static Duration sleep(
278+
static Duration sleep(
259279
HikariDataSource dataSource,
260280
String workflowUuid,
261281
int functionId,
@@ -298,7 +318,7 @@ public static Duration sleep(
298318
StepResult output =
299319
new StepResult(workflowUuid, functionId, functionName)
300320
.withOutput(JSONUtil.serialize(endTime));
301-
recordStepResultTxn(dataSource, output, startTime, schema);
321+
recordStepResultTxn(dataSource, output, startTime, (long) endTime, schema);
302322
} catch (DBOSWorkflowExecutionConflictException e) {
303323
logger.error("Error recording sleep", e);
304324
}

transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,15 +85,29 @@ void speedUpPollingForTest() {
8585
*
8686
* @param initStatus The initial workflow status details.
8787
* @param maxRetries Optional maximum number of retries.
88+
* @param isRecoveryRequest True if this is a recovery request, indicating that this node is told
89+
* it owns the workflow even if the ID already exists
90+
* @param isDequeuedRequest True if this is a dequeue request, indicating that this node is told
91+
* it owns the workflow (provided it is in the enqueued state)
8892
* @return An object containing the current status and optionally the deadline epoch milliseconds.
8993
* @throws DBOSConflictingWorkflowException If a conflicting workflow already exists.
9094
* @throws DBOSMaxRecoveryAttemptsExceededException If the workflow exceeds max retries.
9195
*/
9296
public WorkflowInitResult initWorkflowStatus(
93-
WorkflowStatusInternal initStatus, Integer maxRetries) {
97+
WorkflowStatusInternal initStatus,
98+
Integer maxRetries,
99+
boolean isRecoveryRequest,
100+
boolean isDequeuedRequest) {
101+
102+
// This ID will be used to tell if we are the first writer of the record, or if there is an
103+
// existing one
104+
// Note that it is generated outside of the DB retry loop, in case commit acks get lost and we
105+
// do not know if we committed or not
106+
String ownerXid = UUID.randomUUID().toString();
94107
return DbRetry.call(
95108
() -> {
96-
return workflowDAO.initWorkflowStatus(initStatus, maxRetries);
109+
return workflowDAO.initWorkflowStatus(
110+
initStatus, maxRetries, isRecoveryRequest, isDequeuedRequest, ownerXid);
97111
});
98112
}
99113

@@ -163,9 +177,10 @@ public StepResult checkStepExecutionTxn(String workflowId, int functionId, Strin
163177
}
164178

165179
public void recordStepResultTxn(StepResult result, long startTime) {
180+
var et = System.currentTimeMillis();
166181
DbRetry.run(
167182
() -> {
168-
StepsDAO.recordStepResultTxn(dataSource, result, startTime, this.schema);
183+
StepsDAO.recordStepResultTxn(dataSource, result, startTime, et, this.schema);
169184
});
170185
}
171186

0 commit comments

Comments
 (0)