Skip to content

Commit 3a23ab8

Browse files
authored
@TransactionalStep (#400)
Adds `transact-spring-txstep-starter`, a new Spring Boot auto-configuration module that brings `@TransactionalStep` to Spring applications. **What it does:** - Introduces `@TransactionalStep`, a method annotation that marks a Spring-managed method as an idempotent DBOS step. The method body runs inside a `REQUIRES_NEW` Spring transaction; the return value is written to `tx_step_outputs` **atomically** with the user's database work. On workflow retry, the recorded output is replayed without re-executing the method body. - `TransactionalStepAspect` intercepts annotated methods and delegates to `TransactionalStepFactory`, which calls `DBOS.runStep()` and uses `DataSourceUtils.getConnection()` to write step output on the same transaction-bound connection. - `TransactionalStepAutoConfiguration` wires up the factory and aspect as Spring beans, auto-configured via `AutoConfiguration.imports`. - `TransactionalStepRegistrar` scans the Spring context post-startup and calls `factory.initialize()` (creates `tx_step_outputs`) only when annotated methods are found — no DB contact for apps that don't use the annotation. - JPA support: `TransactionalStepFactory` auto-detects `JpaTransactionManager` and sets its `dataSource` property to bridge JPA transactions to `DataSourceUtils`. **Supported stacks (all covered by integration tests):** - Spring JDBC / `JdbcTemplate` - JDBI (`jdbi3-spring` transaction binding) - jOOQ (`spring-boot-starter-jooq`) - JPA / Hibernate (`spring-boot-starter-data-jpa`) This PR also fixes previously implemented step factory's handling of concurrent updates. If two executors race to run the same step, both may pass the initial `checkExecution` read before either has committed. Previously, both executors transactions would commit, with the 2nd `recordOutput` INSERT being ignored. This has been fixed so the race loser's `recordOutput` INSERT into `tx_step_outputs` hits the `(workflow_id, step_id)` unique constraint. The factory catches the resulting `23505` violation and converts it to a `StepConflictException`. The calling code catches `StepConflictException`, rolls back the REQUIRES_NEW transaction (discarding the loser's database work), then re-reads tx_step_outputs to return the winner's committed result: The outcome is idempotent from the caller's perspective: both executors return the same winner result, and only the winner's database write is committed. Also includes minor refactors to `JdbcStepFactory`, `JdbiStepFactory`, and `JooqStepFactory` to extract shared SQL into `TxStepSchema` helpers, and expands their test suites. fixes #385
1 parent 35ad670 commit 3a23ab8

24 files changed

Lines changed: 3553 additions & 101 deletions

File tree

gradle/libs.versions.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ asm = "9.9.1"
33
aspectj = "1.9.22.1"
44
assertj = "3.27.7"
55
cron-utils = "9.2.1"
6+
hibernate = "6.6.15.Final"
67
hikaricp = "7.0.2"
78
jackson = "2.21.3"
89
java-websocket = "1.6.0"
@@ -46,6 +47,7 @@ jackson-jsr310 = { module = "com.fasterxml.jackson.datatype:jackson-datatype-jsr
4647
java-websocket = { module = "org.java-websocket:Java-WebSocket", version.ref = "java-websocket" }
4748
jaxb-api = { module = "jakarta.xml.bind:jakarta.xml.bind-api", version.ref = "jaxb-api" }
4849
jdbi-core = { module = "org.jdbi:jdbi3-core", version.ref = "jdbi" }
50+
jdbi-spring = { module = "org.jdbi:jdbi3-spring", version.ref = "jdbi" }
4951
jooq = { module = "org.jooq:jooq", version.ref = "jooq" }
5052
jspecify = { module = "org.jspecify:jspecify", version.ref = "jspecify" }
5153
junit-bom = { module = "org.junit:junit-bom", version.ref = "junit" }
@@ -62,7 +64,11 @@ postgresql = { module = "org.postgresql:postgresql", version.ref = "postgresql"
6264
rest-assured = { module = "io.rest-assured:rest-assured", version.ref = "rest-assured" }
6365
slf4j-api = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" }
6466
slf4j-simple = { module = "org.slf4j:slf4j-simple", version.ref = "slf4j" }
67+
hibernate-core = { module = "org.hibernate.orm:hibernate-core", version.ref = "hibernate" }
6568
spring-aop = { module = "org.springframework:spring-aop", version.ref = "spring-framework" }
69+
spring-jdbc = { module = "org.springframework:spring-jdbc", version.ref = "spring-framework" }
70+
spring-orm = { module = "org.springframework:spring-orm", version.ref = "spring-framework" }
71+
spring-tx = { module = "org.springframework:spring-tx", version.ref = "spring-framework" }
6672
spring-boot-autoconfigure = { module = "org.springframework.boot:spring-boot-autoconfigure", version.ref = "spring-boot" }
6773
spring-boot-configuration-processor = { module = "org.springframework.boot:spring-boot-configuration-processor", version.ref = "spring-boot" }
6874
spring-boot-test = { module = "org.springframework.boot:spring-boot-test", version.ref = "spring-boot" }

settings.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ include(
44
"transact",
55
"transact-cli",
66
"transact-spring-boot-starter",
7+
"transact-spring-txstep-starter",
78
"transact-jdbi-step-factory",
89
"transact-jooq-step-factory",
910
)

transact-jdbi-step-factory/src/main/java/dev/dbos/transact/jdbi/JdbiStepFactory.java

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import dev.dbos.transact.json.DBOSSerializer;
55
import dev.dbos.transact.json.SerializationUtil;
66
import dev.dbos.transact.txstep.PostgresStepFactory;
7+
import dev.dbos.transact.txstep.TxStepSchema;
78
import dev.dbos.transact.workflow.internal.StepResult;
89

910
import java.util.Objects;
@@ -13,6 +14,7 @@
1314
import org.jdbi.v3.core.HandleCallback;
1415
import org.jdbi.v3.core.HandleConsumer;
1516
import org.jdbi.v3.core.Jdbi;
17+
import org.jdbi.v3.core.statement.UnableToExecuteStatementException;
1618

1719
/**
1820
* Runs idempotent transactional steps inside DBOS workflows using Jdbi3 {@link Handle} objects.
@@ -107,13 +109,25 @@ public JdbiStepFactory(DBOS dbos, Jdbi jdbi, String schema, DBOSSerializer seria
107109
public <R, X extends Exception> R inStep(final HandleCallback<R, X> callback, String stepName)
108110
throws X {
109111
return runTxStep(
110-
(wfId, stepId) ->
111-
jdbi.inTransaction(
112+
(wfId, stepId) -> {
113+
try {
114+
return jdbi.inTransaction(
112115
h -> {
113116
var result = callback.withHandle(h);
114117
recordOutput(h, wfId, stepId, result);
115118
return result;
116-
}),
119+
});
120+
} catch (StepConflictException e) {
121+
return checkExecution(wfId, stepId, stepName)
122+
.orElseThrow(
123+
() ->
124+
new IllegalStateException(
125+
"Conflict on tx_step_outputs but winner row not found: workflowId=%s stepId=%d stepName=%s"
126+
.formatted(wfId, stepId, stepName),
127+
e))
128+
.<R, X>toResult(serializer);
129+
}
130+
},
117131
stepName);
118132
}
119133

@@ -144,7 +158,7 @@ public <X extends Exception> void useStep(final HandleConsumer<X> callback, Stri
144158
protected Optional<StepResult> checkExecution(String workflowId, int stepId, String stepName) {
145159
return jdbi.withHandle(
146160
h ->
147-
h.createQuery(checkSql())
161+
h.createQuery(TxStepSchema.checkSql(schema))
148162
.bind(0, workflowId)
149163
.bind(1, stepId)
150164
.map(
@@ -168,10 +182,13 @@ private <R> void recordOutput(Handle handle, String workflowId, int stepId, R re
168182
@Override
169183
protected void recordError(String workflowId, int stepId, Exception exception) {
170184
var value = SerializationUtil.serializeError(exception, null, serializer);
171-
jdbi.useTransaction(
172-
h ->
173-
recordResult(
174-
h, workflowId, stepId, null, value.serializedValue(), value.serialization()));
185+
try {
186+
jdbi.useTransaction(
187+
h ->
188+
recordResult(
189+
h, workflowId, stepId, null, value.serializedValue(), value.serialization()));
190+
} catch (StepConflictException ignored) {
191+
}
175192
}
176193

177194
private void recordResult(
@@ -181,13 +198,18 @@ private void recordResult(
181198
String output,
182199
String error,
183200
String serialization) {
184-
handle
185-
.createUpdate(upsertSql())
186-
.bind(0, workflowId)
187-
.bind(1, stepId)
188-
.bind(2, output)
189-
.bind(3, error)
190-
.bind(4, serialization)
191-
.execute();
201+
try {
202+
handle
203+
.createUpdate(TxStepSchema.upsertSql(schema))
204+
.bind(0, workflowId)
205+
.bind(1, stepId)
206+
.bind(2, output)
207+
.bind(3, error)
208+
.bind(4, serialization)
209+
.execute();
210+
} catch (UnableToExecuteStatementException e) {
211+
if (isUniqueViolation(e)) throw new StepConflictException(e);
212+
throw e;
213+
}
192214
}
193215
}

transact-jdbi-step-factory/src/test/java/dev/dbos/transact/jdbi/JdbiStepFactoryTest.java

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import dev.dbos.transact.DBOS;
1010
import dev.dbos.transact.config.DBOSConfig;
1111
import dev.dbos.transact.context.WorkflowOptions;
12+
import dev.dbos.transact.database.SystemDatabase;
1213
import dev.dbos.transact.json.SerializationUtil;
1314
import dev.dbos.transact.utils.DBUtils;
1415
import dev.dbos.transact.utils.PgContainer;
@@ -18,6 +19,8 @@
1819
import java.sql.SQLException;
1920
import java.util.Objects;
2021

22+
import javax.sql.DataSource;
23+
2124
import com.zaxxer.hikari.HikariDataSource;
2225
import org.jdbi.v3.core.Handle;
2326
import org.jdbi.v3.core.Jdbi;
@@ -35,14 +38,20 @@ record TestResult(String user, int greetCount) {}
3538
TestResult readWorkflow(String user);
3639

3740
TestResult insertThenReadWorkflow(String user);
41+
42+
TestResult conflictWorkflow(String user) throws SQLException;
3843
}
3944

4045
class FactoryTestServiceImpl implements FactoryTestService {
4146

4247
private final JdbiStepFactory stepFactory;
48+
private final DataSource dataSource;
49+
final String schema;
4350

44-
public FactoryTestServiceImpl(JdbiStepFactory stepFactory) {
51+
public FactoryTestServiceImpl(JdbiStepFactory stepFactory, DataSource dataSource, String schema) {
4552
this.stepFactory = stepFactory;
53+
this.dataSource = dataSource;
54+
this.schema = schema;
4655
}
4756

4857
FactoryTestService.TestResult insertGreeting(Handle handle, String user) {
@@ -106,6 +115,38 @@ public FactoryTestService.TestResult insertThenReadWorkflow(String user) {
106115
stepFactory.useStep((Handle h) -> insertGreeting(h, user), "insertGreeting");
107116
return stepFactory.inStep((Handle h) -> readGreeting(h, user), "readGreeting");
108117
}
118+
119+
// Simulates a concurrent winner committing a result while this executor's transaction is still
120+
// open. The separate autocommit connection represents the other executor — its INSERT persists
121+
// even when JDBI rolls back the main transaction. When recordResult subsequently tries to INSERT
122+
// the same (workflowId, stepId) key, it gets a 23505 unique-constraint violation. The factory
123+
// rolls back and falls back to checkExecution to return the winner's value.
124+
FactoryTestService.TestResult conflictGreeting(
125+
Handle handle, String user, FactoryTestService.TestResult winner) throws SQLException {
126+
var wfId = Objects.requireNonNull(DBOS.workflowId());
127+
var value = SerializationUtil.serializeValue(winner, null, null);
128+
var sql =
129+
"""
130+
INSERT INTO "%s".tx_step_outputs(workflow_id, step_id, output, error, serialization)
131+
VALUES (?, 0, ?, NULL, ?)
132+
"""
133+
.formatted(schema);
134+
try (var conn2 = dataSource.getConnection();
135+
var stmt = conn2.prepareStatement(sql)) {
136+
stmt.setString(1, wfId);
137+
stmt.setString(2, value.serializedValue());
138+
stmt.setString(3, value.serialization());
139+
stmt.executeUpdate();
140+
}
141+
return insertGreeting(handle, user);
142+
}
143+
144+
@Override
145+
@Workflow
146+
public FactoryTestService.TestResult conflictWorkflow(String user) throws SQLException {
147+
var winner = new FactoryTestService.TestResult(user, 99);
148+
return stepFactory.inStep((Handle h) -> conflictGreeting(h, user, winner), "conflictStep");
149+
}
109150
}
110151

111152
public class JdbiStepFactoryTest {
@@ -134,7 +175,9 @@ void beforeEach() throws SQLException {
134175
Jdbi jdbi = Jdbi.create(dataSource);
135176
stepFactory = new JdbiStepFactory(dbos, jdbi);
136177

137-
impl = new FactoryTestServiceImpl(stepFactory);
178+
impl =
179+
new FactoryTestServiceImpl(
180+
stepFactory, dataSource, SystemDatabase.sanitizeSchema(dbosConfig.databaseSchema()));
138181
proxy = dbos.registerProxy(FactoryTestService.class, impl);
139182

140183
dbos.launch();
@@ -376,6 +419,33 @@ public void testRetryPartialMultipleSteps() throws Exception {
376419
assertTrue(txSteps.get(1).createdAt() >= relaunchTimestamp); // step 1: re-executed on retry
377420
}
378421

422+
// Two executors race to write the result for the same step. The loser detects the 23505
423+
// conflict on its INSERT, rolls back its transaction, and returns the winner's stored value.
424+
@Test
425+
public void testUpsertConflict() throws Exception {
426+
var wfid = "wf-conflict";
427+
var user = "testUser";
428+
429+
try (var _o = new WorkflowOptions(wfid).setContext()) {
430+
var result = proxy.conflictWorkflow(user);
431+
// Returns winner's sentinel value (99), not what insertGreeting would have produced (1)
432+
assertEquals(99, result.greetCount());
433+
assertEquals(user, result.user());
434+
}
435+
436+
// Main transaction was rolled back — insertGreeting's write never committed
437+
assertEquals(0, getGreetCount(user));
438+
439+
// Exactly one tx_step_outputs row containing the winner's result
440+
var rows = DBUtils.getTxStepRows(dataSource, wfid);
441+
assertEquals(1, rows.size());
442+
var row = rows.get(0);
443+
assertNotNull(row.output());
444+
assertNull(row.error());
445+
var output = SerializationUtil.deserializeValue(row.output(), row.serialization(), null);
446+
assertEquals(new FactoryTestService.TestResult(user, 99), output);
447+
}
448+
379449
@Test
380450
public void testRetryInsert() throws Exception {
381451
var timestamp = System.currentTimeMillis();

transact-jooq-step-factory/src/main/java/dev/dbos/transact/jooq/JooqStepFactory.java

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import dev.dbos.transact.json.DBOSSerializer;
55
import dev.dbos.transact.json.SerializationUtil;
66
import dev.dbos.transact.txstep.PostgresStepFactory;
7+
import dev.dbos.transact.txstep.TxStepSchema;
78
import dev.dbos.transact.workflow.internal.StepResult;
89

910
import java.util.Objects;
@@ -13,6 +14,7 @@
1314
import org.jooq.DSLContext;
1415
import org.jooq.TransactionalCallable;
1516
import org.jooq.TransactionalRunnable;
17+
import org.jooq.exception.DataAccessException;
1618

1719
/**
1820
* Runs idempotent transactional steps inside DBOS workflows using jOOQ {@link DSLContext} objects.
@@ -86,13 +88,25 @@ public JooqStepFactory(DBOS dbos, DSLContext dsl, String schema, DBOSSerializer
8688
*/
8789
public <T> T txStepResult(TransactionalCallable<T> callback, String stepName) {
8890
return runTxStep(
89-
(wfId, stepId) ->
90-
dsl.transactionResult(
91+
(wfId, stepId) -> {
92+
try {
93+
return dsl.transactionResult(
9194
trx -> {
9295
var result = callback.run(trx);
9396
recordOutput(trx, wfId, stepId, result);
9497
return result;
95-
}),
98+
});
99+
} catch (StepConflictException e) {
100+
return checkExecution(wfId, stepId, stepName)
101+
.orElseThrow(
102+
() ->
103+
new IllegalStateException(
104+
"Conflict on tx_step_outputs but winner row not found: workflowId=%s stepId=%d stepName=%s"
105+
.formatted(wfId, stepId, stepName),
106+
e))
107+
.<T, RuntimeException>toResult(serializer);
108+
}
109+
},
96110
stepName);
97111
}
98112

@@ -119,7 +133,7 @@ public void txStep(TransactionalRunnable transactional, String stepName) {
119133

120134
@Override
121135
protected Optional<StepResult> checkExecution(String workflowId, int stepId, String stepName) {
122-
return dsl.fetchOptional(checkSql(), workflowId, stepId)
136+
return dsl.fetchOptional(TxStepSchema.checkSql(schema), workflowId, stepId)
123137
.map(
124138
r ->
125139
new StepResult(
@@ -141,15 +155,18 @@ private <R> void recordOutput(Configuration trx, String workflowId, int stepId,
141155
@Override
142156
protected void recordError(String workflowId, int stepId, Exception exception) {
143157
var value = SerializationUtil.serializeError(exception, null, serializer);
144-
dsl.transaction(
145-
trx ->
146-
recordResult(
147-
trx.dsl(),
148-
workflowId,
149-
stepId,
150-
null,
151-
value.serializedValue(),
152-
value.serialization()));
158+
try {
159+
dsl.transaction(
160+
trx ->
161+
recordResult(
162+
trx.dsl(),
163+
workflowId,
164+
stepId,
165+
null,
166+
value.serializedValue(),
167+
value.serialization()));
168+
} catch (StepConflictException ignored) {
169+
}
153170
}
154171

155172
private void recordResult(
@@ -159,6 +176,11 @@ private void recordResult(
159176
String output,
160177
String error,
161178
String serialization) {
162-
ctx.execute(upsertSql(), workflowId, stepId, output, error, serialization);
179+
try {
180+
ctx.execute(TxStepSchema.upsertSql(schema), workflowId, stepId, output, error, serialization);
181+
} catch (DataAccessException e) {
182+
if (isUniqueViolation(e)) throw new StepConflictException(e);
183+
throw e;
184+
}
163185
}
164186
}

0 commit comments

Comments
 (0)