Skip to content

Commit ed275f0

Browse files
authored
patch support (#249)
1 parent 6e7bc56 commit ed275f0

8 files changed

Lines changed: 653 additions & 43 deletions

File tree

transact/src/main/java/dev/dbos/transact/DBOS.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -752,4 +752,38 @@ public static Optional<ExternalState> getExternalState(
752752
public static ExternalState upsertExternalState(ExternalState state) {
753753
return executor("upsertExternalState").upsertExternalState(state);
754754
}
755+
756+
/**
757+
* Marks a breaking change within a workflow. Returns true for new workflows (i.e. workflow sthat
758+
* reach this point in the workflow after the breaking change was created) and false for old
759+
* worklows (i.e. workflows that reached this point in the workflow before the breaking change was
760+
* created). The workflow should execute the new code if this method returns true, otherwise
761+
* execute the old code. Note, patching must be enabled in DBOS configuration and this method must
762+
* be called from within a workflow context.
763+
*
764+
* @param patchName the name of the patch to apply
765+
* @return true for workflows started after the breaking change, false for workflows started
766+
* before the breaking change
767+
* @throws RuntimeException if patching is not enabled in DBOS config or if called outside a
768+
* workflow
769+
*/
770+
public static boolean patch(String patchName) {
771+
return executor("patch").patch(patchName);
772+
}
773+
774+
/**
775+
* Deprecates a previously applied breaking change patch within a workflow. Safely executes
776+
* workflows containing the patch marker, but does not insert the patch marker into new workflows.
777+
* Always returns true (boolean return gives deprecatePatch the same signature as {@link #patch}).
778+
* Like {@link #patch}, patching must be enabled in DBOS configuration and this method must be
779+
* called from within a workflow context.
780+
*
781+
* @param patchName the name of the patch to deprecate
782+
* @return true (always returns true or throws)
783+
* @throws RuntimeException if patching is not enabled in DBOS config or if called outside a
784+
* workflow
785+
*/
786+
public static boolean deprecatePatch(String patchName) {
787+
return executor("deprecatePatch").deprecatePatch(patchName);
788+
}
755789
}

transact/src/main/java/dev/dbos/transact/config/DBOSConfig.java

Lines changed: 82 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,27 @@ public record DBOSConfig(
1919
String conductorDomain,
2020
String appVersion,
2121
String executorId,
22-
String databaseSchema) {
22+
String databaseSchema,
23+
boolean enablePatching) {
24+
25+
public DBOSConfig {
26+
if (appName == null || appName.isEmpty()) {
27+
throw new IllegalArgumentException("DBOSConfig.appName must not be null or empty");
28+
}
29+
if (conductorKey != null && conductorKey.isEmpty()) {
30+
throw new IllegalArgumentException("DBOSConfig.conductorKey must not be empty if specified");
31+
}
32+
if (conductorDomain != null && conductorDomain.isEmpty()) {
33+
throw new IllegalArgumentException(
34+
"DBOSConfig.conductorDomain must not be empty if specified");
35+
}
36+
if (appVersion != null && appVersion.isEmpty()) {
37+
throw new IllegalArgumentException("DBOSConfig.appVersion must not be empty if specified");
38+
}
39+
if (executorId != null && executorId.isEmpty()) {
40+
throw new IllegalArgumentException("DBOSConfig.executorId must not be empty if specified");
41+
}
42+
}
2343

2444
public static DBOSConfig defaults(String appName) {
2545
return new DBOSConfig(
@@ -28,7 +48,7 @@ public static DBOSConfig defaults(String appName) {
2848
null, false, // adminServer
2949
3001, // adminServerPort
3050
true, // migrate
31-
null, null, null, null, null);
51+
null, null, null, null, null, false);
3252
}
3353

3454
public static DBOSConfig defaultsFromEnv(String appName) {
@@ -58,7 +78,8 @@ public DBOSConfig withAppName(String v) {
5878
conductorDomain,
5979
appVersion,
6080
executorId,
61-
databaseSchema);
81+
databaseSchema,
82+
enablePatching);
6283
}
6384

6485
public DBOSConfig withDatabaseUrl(String v) {
@@ -77,7 +98,8 @@ public DBOSConfig withDatabaseUrl(String v) {
7798
conductorDomain,
7899
appVersion,
79100
executorId,
80-
databaseSchema);
101+
databaseSchema,
102+
enablePatching);
81103
}
82104

83105
public DBOSConfig withDbUser(String v) {
@@ -96,7 +118,8 @@ public DBOSConfig withDbUser(String v) {
96118
conductorDomain,
97119
appVersion,
98120
executorId,
99-
databaseSchema);
121+
databaseSchema,
122+
enablePatching);
100123
}
101124

102125
public DBOSConfig withDbPassword(String v) {
@@ -115,7 +138,8 @@ public DBOSConfig withDbPassword(String v) {
115138
conductorDomain,
116139
appVersion,
117140
executorId,
118-
databaseSchema);
141+
databaseSchema,
142+
enablePatching);
119143
}
120144

121145
public DBOSConfig withMaximumPoolSize(int v) {
@@ -134,7 +158,8 @@ public DBOSConfig withMaximumPoolSize(int v) {
134158
conductorDomain,
135159
appVersion,
136160
executorId,
137-
databaseSchema);
161+
databaseSchema,
162+
enablePatching);
138163
}
139164

140165
public DBOSConfig withConnectionTimeout(int v) {
@@ -153,7 +178,8 @@ public DBOSConfig withConnectionTimeout(int v) {
153178
conductorDomain,
154179
appVersion,
155180
executorId,
156-
databaseSchema);
181+
databaseSchema,
182+
enablePatching);
157183
}
158184

159185
public DBOSConfig withDataSource(HikariDataSource v) {
@@ -172,7 +198,8 @@ public DBOSConfig withDataSource(HikariDataSource v) {
172198
conductorDomain,
173199
appVersion,
174200
executorId,
175-
databaseSchema);
201+
databaseSchema,
202+
enablePatching);
176203
}
177204

178205
public DBOSConfig withAdminServer(boolean v) {
@@ -191,7 +218,8 @@ public DBOSConfig withAdminServer(boolean v) {
191218
conductorDomain,
192219
appVersion,
193220
executorId,
194-
databaseSchema);
221+
databaseSchema,
222+
enablePatching);
195223
}
196224

197225
public DBOSConfig withAdminServerPort(int v) {
@@ -210,7 +238,8 @@ public DBOSConfig withAdminServerPort(int v) {
210238
conductorDomain,
211239
appVersion,
212240
executorId,
213-
databaseSchema);
241+
databaseSchema,
242+
enablePatching);
214243
}
215244

216245
public DBOSConfig withMigrate(boolean v) {
@@ -229,7 +258,8 @@ public DBOSConfig withMigrate(boolean v) {
229258
conductorDomain,
230259
appVersion,
231260
executorId,
232-
databaseSchema);
261+
databaseSchema,
262+
enablePatching);
233263
}
234264

235265
public DBOSConfig withConductorKey(String v) {
@@ -248,7 +278,8 @@ public DBOSConfig withConductorKey(String v) {
248278
conductorDomain,
249279
appVersion,
250280
executorId,
251-
databaseSchema);
281+
databaseSchema,
282+
enablePatching);
252283
}
253284

254285
public DBOSConfig withConductorDomain(String v) {
@@ -267,7 +298,8 @@ public DBOSConfig withConductorDomain(String v) {
267298
v,
268299
appVersion,
269300
executorId,
270-
databaseSchema);
301+
databaseSchema,
302+
enablePatching);
271303
}
272304

273305
public DBOSConfig withAppVersion(String v) {
@@ -286,7 +318,8 @@ public DBOSConfig withAppVersion(String v) {
286318
conductorDomain,
287319
v,
288320
executorId,
289-
databaseSchema);
321+
databaseSchema,
322+
enablePatching);
290323
}
291324

292325
public DBOSConfig withExecutorId(String v) {
@@ -305,7 +338,8 @@ public DBOSConfig withExecutorId(String v) {
305338
conductorDomain,
306339
appVersion,
307340
v,
308-
databaseSchema);
341+
databaseSchema,
342+
enablePatching);
309343
}
310344

311345
public DBOSConfig withDatabaseSchema(String v) {
@@ -324,6 +358,35 @@ public DBOSConfig withDatabaseSchema(String v) {
324358
conductorDomain,
325359
appVersion,
326360
executorId,
361+
v,
362+
enablePatching);
363+
}
364+
365+
public DBOSConfig withEnablePatching() {
366+
return this.withEnablePatching(true);
367+
}
368+
369+
public DBOSConfig withDisablePatching() {
370+
return this.withEnablePatching(false);
371+
}
372+
373+
public DBOSConfig withEnablePatching(boolean v) {
374+
return new DBOSConfig(
375+
appName,
376+
databaseUrl,
377+
dbUser,
378+
dbPassword,
379+
maximumPoolSize,
380+
connectionTimeout,
381+
dataSource,
382+
adminServer,
383+
adminServerPort,
384+
migrate,
385+
conductorKey,
386+
conductorDomain,
387+
appVersion,
388+
executorId,
389+
databaseSchema,
327390
v);
328391
}
329392

@@ -338,7 +401,7 @@ public DBOSConfig disableAdminServer() {
338401
// Override toString to mask the DB password
339402
@Override
340403
public String toString() {
341-
return "DBOSConfig[appName=%s, databaseUrl=%s, dbUser=%s, dbPassword=***, maximumPoolSize=%d, connectionTimeout=%d, dataSource=%s, adminServer=%s, adminServerPort=%d, migrate=%s, conductorKey=%s, conductorDomain=%s, appVersion=%s, executorId=%s, dbSchema=%s]"
404+
return "DBOSConfig[appName=%s, databaseUrl=%s, dbUser=%s, dbPassword=***, maximumPoolSize=%d, connectionTimeout=%d, dataSource=%s, adminServer=%s, adminServerPort=%d, migrate=%s, conductorKey=%s, conductorDomain=%s, appVersion=%s, executorId=%s, dbSchema=%s, enablePatching=%s]"
342405
.formatted(
343406
appName,
344407
databaseUrl,
@@ -353,6 +416,7 @@ public String toString() {
353416
conductorDomain,
354417
appVersion,
355418
executorId,
356-
databaseSchema);
419+
databaseSchema,
420+
enablePatching);
357421
}
358422
}

transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,10 @@ public WorkflowInitResult initWorkflowStatus(
100100
boolean isRecoveryRequest,
101101
boolean isDequeuedRequest) {
102102

103-
// This ID will be used to tell if we are the first writer of the record, or if there is an
104-
// existing one
105-
// Note that it is generated outside of the DB retry loop, in case commit acks get lost and we
106-
// do not know if we committed or not
103+
// This ID will be used to tell if we are the first writer of the record, or if
104+
// there is an existing one.
105+
// Note that it is generated outside of the DB retry loop, in case commit acks
106+
// get lost and we do not know if we committed or not
107107
String ownerXid = UUID.randomUUID().toString();
108108
return DbRetry.call(
109109
() -> {
@@ -442,6 +442,58 @@ SELECT function_name, COUNT(*) as count
442442
});
443443
}
444444

445+
private String getCheckpointName(Connection conn, String workflowId, int functionId)
446+
throws SQLException {
447+
var sql =
448+
"""
449+
SELECT function_name
450+
FROM %s.operation_outputs
451+
WHERE workflow_uuid = ? AND function_id = ?
452+
"""
453+
.formatted(this.schema);
454+
455+
try (var ps = conn.prepareStatement(sql)) {
456+
ps.setString(1, workflowId);
457+
ps.setInt(2, functionId);
458+
try (var rs = ps.executeQuery()) {
459+
if (rs.next()) {
460+
return rs.getString("function_name");
461+
} else {
462+
return null;
463+
}
464+
}
465+
}
466+
}
467+
468+
public boolean patch(String workflowId, int functionId, String patchName) {
469+
Objects.requireNonNull(patchName, "patchName cannot be null");
470+
return DbRetry.call(
471+
() -> {
472+
try (Connection conn = dataSource.getConnection()) {
473+
var checkpointName = getCheckpointName(conn, workflowId, functionId);
474+
if (checkpointName == null) {
475+
var output = new StepResult(workflowId, functionId, patchName);
476+
StepsDAO.recordStepResultTxn(
477+
output, System.currentTimeMillis(), null, conn, this.schema);
478+
return true;
479+
} else {
480+
return patchName.equals(checkpointName);
481+
}
482+
}
483+
});
484+
}
485+
486+
public boolean deprecatePatch(String workflowId, int functionId, String patchName) {
487+
Objects.requireNonNull(patchName, "patchName cannot be null");
488+
return DbRetry.call(
489+
() -> {
490+
try (Connection conn = dataSource.getConnection()) {
491+
var checkpointName = getCheckpointName(conn, workflowId, functionId);
492+
return patchName.equals(checkpointName);
493+
}
494+
});
495+
}
496+
445497
// package public helper for test purposes
446498
Connection getSysDBConnection() throws SQLException {
447499
return dataSource.getConnection();

0 commit comments

Comments
 (0)