diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 95d4cf67..03d80885 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -3,6 +3,7 @@ asm = "9.9.1" aspectj = "1.9.22.1" assertj = "3.27.7" cron-utils = "9.2.1" +hibernate = "6.6.15.Final" hikaricp = "7.0.2" jackson = "2.21.3" java-websocket = "1.6.0" @@ -46,6 +47,7 @@ jackson-jsr310 = { module = "com.fasterxml.jackson.datatype:jackson-datatype-jsr java-websocket = { module = "org.java-websocket:Java-WebSocket", version.ref = "java-websocket" } jaxb-api = { module = "jakarta.xml.bind:jakarta.xml.bind-api", version.ref = "jaxb-api" } jdbi-core = { module = "org.jdbi:jdbi3-core", version.ref = "jdbi" } +jdbi-spring = { module = "org.jdbi:jdbi3-spring", version.ref = "jdbi" } jooq = { module = "org.jooq:jooq", version.ref = "jooq" } jspecify = { module = "org.jspecify:jspecify", version.ref = "jspecify" } junit-bom = { module = "org.junit:junit-bom", version.ref = "junit" } @@ -62,7 +64,11 @@ postgresql = { module = "org.postgresql:postgresql", version.ref = "postgresql" rest-assured = { module = "io.rest-assured:rest-assured", version.ref = "rest-assured" } slf4j-api = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" } slf4j-simple = { module = "org.slf4j:slf4j-simple", version.ref = "slf4j" } +hibernate-core = { module = "org.hibernate.orm:hibernate-core", version.ref = "hibernate" } spring-aop = { module = "org.springframework:spring-aop", version.ref = "spring-framework" } +spring-jdbc = { module = "org.springframework:spring-jdbc", version.ref = "spring-framework" } +spring-orm = { module = "org.springframework:spring-orm", version.ref = "spring-framework" } +spring-tx = { module = "org.springframework:spring-tx", version.ref = "spring-framework" } spring-boot-autoconfigure = { module = "org.springframework.boot:spring-boot-autoconfigure", version.ref = "spring-boot" } spring-boot-configuration-processor = { module = "org.springframework.boot:spring-boot-configuration-processor", version.ref = "spring-boot" } spring-boot-test = { module = "org.springframework.boot:spring-boot-test", version.ref = "spring-boot" } diff --git a/settings.gradle.kts b/settings.gradle.kts index af6f2a87..329a5171 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -4,6 +4,7 @@ include( "transact", "transact-cli", "transact-spring-boot-starter", + "transact-spring-txstep-starter", "transact-jdbi-step-factory", "transact-jooq-step-factory", ) diff --git a/transact-jdbi-step-factory/src/main/java/dev/dbos/transact/jdbi/JdbiStepFactory.java b/transact-jdbi-step-factory/src/main/java/dev/dbos/transact/jdbi/JdbiStepFactory.java index 779f737c..48dfeae4 100644 --- a/transact-jdbi-step-factory/src/main/java/dev/dbos/transact/jdbi/JdbiStepFactory.java +++ b/transact-jdbi-step-factory/src/main/java/dev/dbos/transact/jdbi/JdbiStepFactory.java @@ -4,6 +4,7 @@ import dev.dbos.transact.json.DBOSSerializer; import dev.dbos.transact.json.SerializationUtil; import dev.dbos.transact.txstep.PostgresStepFactory; +import dev.dbos.transact.txstep.TxStepSchema; import dev.dbos.transact.workflow.internal.StepResult; import java.util.Objects; @@ -13,6 +14,7 @@ import org.jdbi.v3.core.HandleCallback; import org.jdbi.v3.core.HandleConsumer; import org.jdbi.v3.core.Jdbi; +import org.jdbi.v3.core.statement.UnableToExecuteStatementException; /** * Runs idempotent transactional steps inside DBOS workflows using Jdbi3 {@link Handle} objects. @@ -107,13 +109,25 @@ public JdbiStepFactory(DBOS dbos, Jdbi jdbi, String schema, DBOSSerializer seria public R inStep(final HandleCallback callback, String stepName) throws X { return runTxStep( - (wfId, stepId) -> - jdbi.inTransaction( + (wfId, stepId) -> { + try { + return jdbi.inTransaction( h -> { var result = callback.withHandle(h); recordOutput(h, wfId, stepId, result); return result; - }), + }); + } catch (StepConflictException e) { + return checkExecution(wfId, stepId, stepName) + .orElseThrow( + () -> + new IllegalStateException( + "Conflict on tx_step_outputs but winner row not found: workflowId=%s stepId=%d stepName=%s" + .formatted(wfId, stepId, stepName), + e)) + .toResult(serializer); + } + }, stepName); } @@ -144,7 +158,7 @@ public void useStep(final HandleConsumer callback, Stri protected Optional checkExecution(String workflowId, int stepId, String stepName) { return jdbi.withHandle( h -> - h.createQuery(checkSql()) + h.createQuery(TxStepSchema.checkSql(schema)) .bind(0, workflowId) .bind(1, stepId) .map( @@ -168,10 +182,13 @@ private void recordOutput(Handle handle, String workflowId, int stepId, R re @Override protected void recordError(String workflowId, int stepId, Exception exception) { var value = SerializationUtil.serializeError(exception, null, serializer); - jdbi.useTransaction( - h -> - recordResult( - h, workflowId, stepId, null, value.serializedValue(), value.serialization())); + try { + jdbi.useTransaction( + h -> + recordResult( + h, workflowId, stepId, null, value.serializedValue(), value.serialization())); + } catch (StepConflictException ignored) { + } } private void recordResult( @@ -181,13 +198,18 @@ private void recordResult( String output, String error, String serialization) { - handle - .createUpdate(upsertSql()) - .bind(0, workflowId) - .bind(1, stepId) - .bind(2, output) - .bind(3, error) - .bind(4, serialization) - .execute(); + try { + handle + .createUpdate(TxStepSchema.upsertSql(schema)) + .bind(0, workflowId) + .bind(1, stepId) + .bind(2, output) + .bind(3, error) + .bind(4, serialization) + .execute(); + } catch (UnableToExecuteStatementException e) { + if (isUniqueViolation(e)) throw new StepConflictException(e); + throw e; + } } } diff --git a/transact-jdbi-step-factory/src/test/java/dev/dbos/transact/jdbi/JdbiStepFactoryTest.java b/transact-jdbi-step-factory/src/test/java/dev/dbos/transact/jdbi/JdbiStepFactoryTest.java index 2e101468..c5dde413 100644 --- a/transact-jdbi-step-factory/src/test/java/dev/dbos/transact/jdbi/JdbiStepFactoryTest.java +++ b/transact-jdbi-step-factory/src/test/java/dev/dbos/transact/jdbi/JdbiStepFactoryTest.java @@ -9,6 +9,7 @@ import dev.dbos.transact.DBOS; import dev.dbos.transact.config.DBOSConfig; import dev.dbos.transact.context.WorkflowOptions; +import dev.dbos.transact.database.SystemDatabase; import dev.dbos.transact.json.SerializationUtil; import dev.dbos.transact.utils.DBUtils; import dev.dbos.transact.utils.PgContainer; @@ -18,6 +19,8 @@ import java.sql.SQLException; import java.util.Objects; +import javax.sql.DataSource; + import com.zaxxer.hikari.HikariDataSource; import org.jdbi.v3.core.Handle; import org.jdbi.v3.core.Jdbi; @@ -35,14 +38,20 @@ record TestResult(String user, int greetCount) {} TestResult readWorkflow(String user); TestResult insertThenReadWorkflow(String user); + + TestResult conflictWorkflow(String user) throws SQLException; } class FactoryTestServiceImpl implements FactoryTestService { private final JdbiStepFactory stepFactory; + private final DataSource dataSource; + final String schema; - public FactoryTestServiceImpl(JdbiStepFactory stepFactory) { + public FactoryTestServiceImpl(JdbiStepFactory stepFactory, DataSource dataSource, String schema) { this.stepFactory = stepFactory; + this.dataSource = dataSource; + this.schema = schema; } FactoryTestService.TestResult insertGreeting(Handle handle, String user) { @@ -106,6 +115,38 @@ public FactoryTestService.TestResult insertThenReadWorkflow(String user) { stepFactory.useStep((Handle h) -> insertGreeting(h, user), "insertGreeting"); return stepFactory.inStep((Handle h) -> readGreeting(h, user), "readGreeting"); } + + // Simulates a concurrent winner committing a result while this executor's transaction is still + // open. The separate autocommit connection represents the other executor — its INSERT persists + // even when JDBI rolls back the main transaction. When recordResult subsequently tries to INSERT + // the same (workflowId, stepId) key, it gets a 23505 unique-constraint violation. The factory + // rolls back and falls back to checkExecution to return the winner's value. + FactoryTestService.TestResult conflictGreeting( + Handle handle, String user, FactoryTestService.TestResult winner) throws SQLException { + var wfId = Objects.requireNonNull(DBOS.workflowId()); + var value = SerializationUtil.serializeValue(winner, null, null); + var sql = + """ + INSERT INTO "%s".tx_step_outputs(workflow_id, step_id, output, error, serialization) + VALUES (?, 0, ?, NULL, ?) + """ + .formatted(schema); + try (var conn2 = dataSource.getConnection(); + var stmt = conn2.prepareStatement(sql)) { + stmt.setString(1, wfId); + stmt.setString(2, value.serializedValue()); + stmt.setString(3, value.serialization()); + stmt.executeUpdate(); + } + return insertGreeting(handle, user); + } + + @Override + @Workflow + public FactoryTestService.TestResult conflictWorkflow(String user) throws SQLException { + var winner = new FactoryTestService.TestResult(user, 99); + return stepFactory.inStep((Handle h) -> conflictGreeting(h, user, winner), "conflictStep"); + } } public class JdbiStepFactoryTest { @@ -134,7 +175,9 @@ void beforeEach() throws SQLException { Jdbi jdbi = Jdbi.create(dataSource); stepFactory = new JdbiStepFactory(dbos, jdbi); - impl = new FactoryTestServiceImpl(stepFactory); + impl = + new FactoryTestServiceImpl( + stepFactory, dataSource, SystemDatabase.sanitizeSchema(dbosConfig.databaseSchema())); proxy = dbos.registerProxy(FactoryTestService.class, impl); dbos.launch(); @@ -376,6 +419,33 @@ public void testRetryPartialMultipleSteps() throws Exception { assertTrue(txSteps.get(1).createdAt() >= relaunchTimestamp); // step 1: re-executed on retry } + // Two executors race to write the result for the same step. The loser detects the 23505 + // conflict on its INSERT, rolls back its transaction, and returns the winner's stored value. + @Test + public void testUpsertConflict() throws Exception { + var wfid = "wf-conflict"; + var user = "testUser"; + + try (var _o = new WorkflowOptions(wfid).setContext()) { + var result = proxy.conflictWorkflow(user); + // Returns winner's sentinel value (99), not what insertGreeting would have produced (1) + assertEquals(99, result.greetCount()); + assertEquals(user, result.user()); + } + + // Main transaction was rolled back — insertGreeting's write never committed + assertEquals(0, getGreetCount(user)); + + // Exactly one tx_step_outputs row containing the winner's result + var rows = DBUtils.getTxStepRows(dataSource, wfid); + assertEquals(1, rows.size()); + var row = rows.get(0); + assertNotNull(row.output()); + assertNull(row.error()); + var output = SerializationUtil.deserializeValue(row.output(), row.serialization(), null); + assertEquals(new FactoryTestService.TestResult(user, 99), output); + } + @Test public void testRetryInsert() throws Exception { var timestamp = System.currentTimeMillis(); diff --git a/transact-jooq-step-factory/src/main/java/dev/dbos/transact/jooq/JooqStepFactory.java b/transact-jooq-step-factory/src/main/java/dev/dbos/transact/jooq/JooqStepFactory.java index 225fb851..5e70cbd1 100644 --- a/transact-jooq-step-factory/src/main/java/dev/dbos/transact/jooq/JooqStepFactory.java +++ b/transact-jooq-step-factory/src/main/java/dev/dbos/transact/jooq/JooqStepFactory.java @@ -4,6 +4,7 @@ import dev.dbos.transact.json.DBOSSerializer; import dev.dbos.transact.json.SerializationUtil; import dev.dbos.transact.txstep.PostgresStepFactory; +import dev.dbos.transact.txstep.TxStepSchema; import dev.dbos.transact.workflow.internal.StepResult; import java.util.Objects; @@ -13,6 +14,7 @@ import org.jooq.DSLContext; import org.jooq.TransactionalCallable; import org.jooq.TransactionalRunnable; +import org.jooq.exception.DataAccessException; /** * Runs idempotent transactional steps inside DBOS workflows using jOOQ {@link DSLContext} objects. @@ -86,13 +88,25 @@ public JooqStepFactory(DBOS dbos, DSLContext dsl, String schema, DBOSSerializer */ public T txStepResult(TransactionalCallable callback, String stepName) { return runTxStep( - (wfId, stepId) -> - dsl.transactionResult( + (wfId, stepId) -> { + try { + return dsl.transactionResult( trx -> { var result = callback.run(trx); recordOutput(trx, wfId, stepId, result); return result; - }), + }); + } catch (StepConflictException e) { + return checkExecution(wfId, stepId, stepName) + .orElseThrow( + () -> + new IllegalStateException( + "Conflict on tx_step_outputs but winner row not found: workflowId=%s stepId=%d stepName=%s" + .formatted(wfId, stepId, stepName), + e)) + .toResult(serializer); + } + }, stepName); } @@ -119,7 +133,7 @@ public void txStep(TransactionalRunnable transactional, String stepName) { @Override protected Optional checkExecution(String workflowId, int stepId, String stepName) { - return dsl.fetchOptional(checkSql(), workflowId, stepId) + return dsl.fetchOptional(TxStepSchema.checkSql(schema), workflowId, stepId) .map( r -> new StepResult( @@ -141,15 +155,18 @@ private void recordOutput(Configuration trx, String workflowId, int stepId, @Override protected void recordError(String workflowId, int stepId, Exception exception) { var value = SerializationUtil.serializeError(exception, null, serializer); - dsl.transaction( - trx -> - recordResult( - trx.dsl(), - workflowId, - stepId, - null, - value.serializedValue(), - value.serialization())); + try { + dsl.transaction( + trx -> + recordResult( + trx.dsl(), + workflowId, + stepId, + null, + value.serializedValue(), + value.serialization())); + } catch (StepConflictException ignored) { + } } private void recordResult( @@ -159,6 +176,11 @@ private void recordResult( String output, String error, String serialization) { - ctx.execute(upsertSql(), workflowId, stepId, output, error, serialization); + try { + ctx.execute(TxStepSchema.upsertSql(schema), workflowId, stepId, output, error, serialization); + } catch (DataAccessException e) { + if (isUniqueViolation(e)) throw new StepConflictException(e); + throw e; + } } } diff --git a/transact-jooq-step-factory/src/test/java/dev/dbos/transact/jooq/JooqStepFactoryTest.java b/transact-jooq-step-factory/src/test/java/dev/dbos/transact/jooq/JooqStepFactoryTest.java index 2f5846ff..6c52140a 100644 --- a/transact-jooq-step-factory/src/test/java/dev/dbos/transact/jooq/JooqStepFactoryTest.java +++ b/transact-jooq-step-factory/src/test/java/dev/dbos/transact/jooq/JooqStepFactoryTest.java @@ -9,6 +9,7 @@ import dev.dbos.transact.DBOS; import dev.dbos.transact.config.DBOSConfig; import dev.dbos.transact.context.WorkflowOptions; +import dev.dbos.transact.database.SystemDatabase; import dev.dbos.transact.json.SerializationUtil; import dev.dbos.transact.utils.DBUtils; import dev.dbos.transact.utils.PgContainer; @@ -18,6 +19,8 @@ import java.sql.SQLException; import java.util.Objects; +import javax.sql.DataSource; + import com.zaxxer.hikari.HikariDataSource; import org.jooq.DSLContext; import org.jooq.SQLDialect; @@ -36,14 +39,20 @@ record TestResult(String user, int greetCount) {} TestResult readWorkflow(String user); TestResult insertThenReadWorkflow(String user); + + TestResult conflictWorkflow(String user) throws SQLException; } class FactoryTestServiceImpl implements FactoryTestService { private final JooqStepFactory stepFactory; + private final DataSource dataSource; + final String schema; - public FactoryTestServiceImpl(JooqStepFactory stepFactory) { + public FactoryTestServiceImpl(JooqStepFactory stepFactory, DataSource dataSource, String schema) { this.stepFactory = stepFactory; + this.dataSource = dataSource; + this.schema = schema; } FactoryTestService.TestResult insertGreeting(DSLContext ctx, String user) { @@ -96,6 +105,39 @@ public FactoryTestService.TestResult insertThenReadWorkflow(String user) { stepFactory.txStep(ctx -> insertGreeting(ctx.dsl(), user), "insertGreeting"); return stepFactory.txStepResult(ctx -> readGreeting(ctx.dsl(), user), "readGreeting"); } + + // Simulates a concurrent winner committing a result while this executor's transaction is still + // open. The separate autocommit connection represents the other executor — its INSERT persists + // even when jOOQ rolls back the main transaction. When recordResult subsequently tries to INSERT + // the same (workflowId, stepId) key, it gets a 23505 unique-constraint violation. The factory + // rolls back and falls back to checkExecution to return the winner's value. + FactoryTestService.TestResult conflictGreeting( + DSLContext ctx, String user, FactoryTestService.TestResult winner) throws SQLException { + var wfId = Objects.requireNonNull(DBOS.workflowId()); + var value = SerializationUtil.serializeValue(winner, null, null); + var sql = + """ + INSERT INTO "%s".tx_step_outputs(workflow_id, step_id, output, error, serialization) + VALUES (?, 0, ?, NULL, ?) + """ + .formatted(schema); + try (var conn2 = dataSource.getConnection(); + var stmt = conn2.prepareStatement(sql)) { + stmt.setString(1, wfId); + stmt.setString(2, value.serializedValue()); + stmt.setString(3, value.serialization()); + stmt.executeUpdate(); + } + return insertGreeting(ctx, user); + } + + @Override + @Workflow + public FactoryTestService.TestResult conflictWorkflow(String user) throws SQLException { + var winner = new FactoryTestService.TestResult(user, 99); + return stepFactory.txStepResult( + ctx -> conflictGreeting(ctx.dsl(), user, winner), "conflictStep"); + } } public class JooqStepFactoryTest { @@ -123,7 +165,9 @@ void beforeEach() throws SQLException { DSLContext dsl = DSL.using(dataSource, SQLDialect.POSTGRES); stepFactory = new JooqStepFactory(dbos, dsl); - impl = new FactoryTestServiceImpl(stepFactory); + impl = + new FactoryTestServiceImpl( + stepFactory, dataSource, SystemDatabase.sanitizeSchema(dbosConfig.databaseSchema())); proxy = dbos.registerProxy(FactoryTestService.class, impl); dbos.launch(); @@ -365,6 +409,33 @@ public void testRetryPartialMultipleSteps() throws Exception { assertTrue(txSteps.get(1).createdAt() >= relaunchTimestamp); // step 1: re-executed on retry } + // Two executors race to write the result for the same step. The loser detects the 23505 + // conflict on its INSERT, rolls back its transaction, and returns the winner's stored value. + @Test + public void testUpsertConflict() throws Exception { + var wfid = "wf-conflict"; + var user = "testUser"; + + try (var _o = new WorkflowOptions(wfid).setContext()) { + var result = proxy.conflictWorkflow(user); + // Returns winner's sentinel value (99), not what insertGreeting would have produced (1) + assertEquals(99, result.greetCount()); + assertEquals(user, result.user()); + } + + // Main transaction was rolled back — insertGreeting's write never committed + assertEquals(0, getGreetCount(user)); + + // Exactly one tx_step_outputs row containing the winner's result + var rows = DBUtils.getTxStepRows(dataSource, wfid); + assertEquals(1, rows.size()); + var row = rows.get(0); + assertNotNull(row.output()); + assertNull(row.error()); + var output = SerializationUtil.deserializeValue(row.output(), row.serialization(), null); + assertEquals(new FactoryTestService.TestResult(user, 99), output); + } + @Test public void testRetryInsert() throws Exception { var timestamp = System.currentTimeMillis(); diff --git a/transact-spring-txstep-starter/README.md b/transact-spring-txstep-starter/README.md new file mode 100644 index 00000000..f418e68a --- /dev/null +++ b/transact-spring-txstep-starter/README.md @@ -0,0 +1,192 @@ +# transact-spring-txstep-starter + +Spring Boot auto-configuration for the `@TransactionalStep` annotation. + +`@TransactionalStep` marks a Spring-managed method as an **idempotent transactional step** inside a DBOS workflow. Each call runs in a `REQUIRES_NEW` Spring transaction; the step's return value is written to a `tx_step_outputs` table **atomically** with the user's database work. On workflow retry, the recorded output is replayed without re-executing the method body. + +## Installation + +Add both the DBOS Spring Boot starter and this module: + +**Gradle** +```kotlin +implementation("dev.dbos:transact-spring-boot-starter:") +implementation("dev.dbos:transact-spring-txstep-starter:") +``` + +**Maven** +```xml + + dev.dbos + transact-spring-boot-starter + VERSION + + + dev.dbos + transact-spring-txstep-starter + VERSION + +``` + +## Prerequisites + +- Spring Boot 3.x or 4.x +- A PostgreSQL `DataSource` bean +- A `PlatformTransactionManager` bean (auto-configured by Spring Boot for all supported stacks) +- `dbos.application.name` property set + +## Usage + +Annotate any Spring-managed method with `@TransactionalStep`. The method must be called through a Spring proxy — inject the bean into a `@Workflow`-annotated method in another Spring bean. + +### Spring JDBC / JdbcTemplate + +No extra dependencies. Spring Boot auto-configures `DataSourceTransactionManager` and `JdbcTemplate`. + +```java +@Service +public class OrderStepService { + @Autowired JdbcTemplate jdbc; + + @TransactionalStep + public Order saveOrder(Order order) { + jdbc.update("INSERT INTO orders(id, item, qty) VALUES (?, ?, ?)", + order.id(), order.item(), order.qty()); + return order; + } +} + +@Service +public class OrderWorkflowService { + @Autowired OrderStepService steps; + + @Workflow + public Order processOrder(Order order) { + return steps.saveOrder(order); + } +} +``` + +`JdbcTemplate` routes through `DataSourceUtils` internally, so it automatically joins the `REQUIRES_NEW` transaction started by the aspect. Spring Data JDBC repositories (`CrudRepository`) work identically. + +### JDBI + +Add `jdbi3-spring` so JDBI's `SpringTransactionHandler` reuses the active Spring transaction rather than opening a separate connection: + +```kotlin +implementation("org.jdbi:jdbi3-spring:") +``` + +```java +@Configuration +public class JdbiConfig { + @Bean + public Jdbi jdbi(DataSource dataSource) throws Exception { + var factory = new JdbiFactoryBean(dataSource); + factory.afterPropertiesSet(); + return factory.getObject(); + } +} + +@Service +public class OrderStepService { + @Autowired Jdbi jdbi; + + @TransactionalStep + public Order saveOrder(Order order) { + jdbi.withHandle(h -> + h.execute("INSERT INTO orders(id, item, qty) VALUES (?, ?, ?)", + order.id(), order.item(), order.qty())); + return order; + } +} +``` + +### jOOQ + +Spring Boot auto-configures `DSLContext` with `SpringTransactionProvider` when you add `spring-boot-starter-jooq`. Set `spring.jooq.sql-dialect=POSTGRES` and inject `DSLContext` directly: + +```kotlin +implementation("org.springframework.boot:spring-boot-starter-jooq") +``` + +```java +@Service +public class OrderStepService { + @Autowired DSLContext dsl; + + @TransactionalStep + public Order saveOrder(Order order) { + dsl.execute("INSERT INTO orders(id, item, qty) VALUES (?, ?, ?)", + order.id(), order.item(), order.qty()); + return order; + } +} +``` + +`spring-boot-starter-jooq` wraps the `DataSource` in a `TransactionAwareDataSourceProxy`, so jOOQ queries execute on the Spring-transaction-bound connection. + +### JPA / Hibernate + +Spring Boot auto-configures `JpaTransactionManager` when `spring-boot-starter-data-jpa` is present. + +```kotlin +implementation("org.springframework.boot:spring-boot-starter-data-jpa") +``` + +```java +@Service +public class OrderStepService { + @Autowired OrderRepository repo; // Spring Data JPA repository + + @TransactionalStep + public Order saveOrder(Order order) { + return repo.save(order); + } +} +``` + +Direct `EntityManager` access also works via `EntityManagerFactoryUtils.getTransactionalEntityManager(emf)`. + +### MyBatis + +Spring Boot auto-configures `SqlSessionTemplate` with `DataSourceTransactionManager` via `mybatis-spring-boot-starter`. MyBatis mappers participate in Spring transactions automatically: + +```kotlin +implementation("org.mybatis.spring.boot:mybatis-spring-boot-starter:") +``` + +```java +@Mapper +public interface OrderMapper { + @Insert("INSERT INTO orders(id, item, qty) VALUES(#{id}, #{item}, #{qty})") + void insert(@Param("id") String id, @Param("item") String item, @Param("qty") int qty); +} + +@Service +public class OrderStepService { + @Autowired OrderMapper orderMapper; + + @TransactionalStep + public Order saveOrder(Order order) { + orderMapper.insert(order.id(), order.item(), order.qty()); + return order; + } +} +``` + +## Configuration + +| Property | Default | Description | +|---|---|---| +| `dbos.txstep.schema` | DBOS system schema | PostgreSQL schema for the `tx_step_outputs` table | + +The `tx_step_outputs` table is created lazily on startup — only if at least one `@TransactionalStep` method is found in the Spring context. Applications that never use the annotation incur no database contact. + +## How it works + +1. `TransactionalStepAspect` intercepts every `@TransactionalStep` call and delegates to `TransactionalStepFactory`. +2. `TransactionalStepFactory` calls `DBOS.runStep()`, which checks `tx_step_outputs` for a prior result. If one exists, it is returned immediately (idempotent replay). +3. Otherwise, a `REQUIRES_NEW` Spring transaction is started. The method body runs, and the result is written to `tx_step_outputs` using `DataSourceUtils.getConnection()` — the same connection the transaction holds. +4. The transaction commits, making the user's write and the step output record atomic. If the method throws, the transaction rolls back and the error is recorded separately so retries can replay it. +5. `TransactionalStepRegistrar` scans beans after context startup and calls `factory.initialize()` (which creates `tx_step_outputs`) only when annotated methods are found. diff --git a/transact-spring-txstep-starter/build.gradle.kts b/transact-spring-txstep-starter/build.gradle.kts new file mode 100644 index 00000000..b61cca92 --- /dev/null +++ b/transact-spring-txstep-starter/build.gradle.kts @@ -0,0 +1,133 @@ +import com.vanniktech.maven.publish.DeploymentValidation + +plugins { + id("java-library") + alias(libs.plugins.maven.publish) +} + +tasks.withType { + options.compilerArgs.add("-Xlint:unchecked") + options.compilerArgs.add("-Xlint:deprecation") + options.compilerArgs.add("-Xlint:rawtypes") + options.compilerArgs.add("-Werror") +} + +tasks.withType { + (options as StandardJavadocDocletOptions).apply { + addStringOption("Xdoclint:all,-missing", "-quiet") + encoding = "UTF-8" + } +} + +tasks.named("build") { dependsOn("javadoc") } + +dependencies { + api(project(":transact")) + compileOnly(project(":transact-spring-boot-starter")) + compileOnly(libs.spring.boot.autoconfigure) + compileOnly(libs.spring.aop) + compileOnly(libs.aspectjweaver) + compileOnly(libs.spring.tx) + compileOnly(libs.spring.jdbc) + compileOnly(libs.spring.orm) + + testImplementation(platform(libs.junit.bom)) + testImplementation(libs.junit.jupiter) + testRuntimeOnly(libs.junit.platform.launcher) + + testImplementation(project(":transact-spring-boot-starter")) + testImplementation(libs.spring.boot.test) + testImplementation(libs.spring.boot.autoconfigure) + testImplementation(libs.spring.aop) + testImplementation(libs.aspectjweaver) + testImplementation(libs.spring.tx) + testImplementation(libs.spring.jdbc) + testImplementation(libs.spring.orm) + testImplementation(libs.hibernate.core) + testImplementation(libs.jdbi.core) + testImplementation(libs.jdbi.spring) + testCompileOnly(libs.jaxb.api) + testImplementation(libs.jooq) + testImplementation(libs.assertj.core) + testImplementation(libs.testcontainers.postgresql) + testImplementation(libs.postgresql) + testImplementation(libs.hikaricp) + testRuntimeOnly(libs.logback.classic) +} + +testing { + suites { + val springBoot4Test by + registering(JvmTestSuite::class) { + sources { + java { setSrcDirs(sourceSets["test"].java.srcDirs) } + resources { setSrcDirs(sourceSets["test"].resources.srcDirs) } + } + dependencies { + implementation(project()) + implementation(project(":transact-spring-boot-starter")) + implementation(platform(libs.spring.boot4.dependencies)) + implementation(platform(libs.junit.bom)) + implementation(libs.junit.jupiter) + runtimeOnly(libs.junit.platform.launcher) + implementation(libs.spring.boot4.test) + implementation(libs.spring.boot4.autoconfigure) + implementation("org.springframework:spring-aop") + implementation(libs.aspectjweaver) + implementation("org.springframework:spring-tx") + implementation("org.springframework:spring-jdbc") + implementation("org.springframework:spring-orm") + implementation("org.hibernate.orm:hibernate-core") + implementation(libs.jdbi.core) + implementation(libs.jdbi.spring) + compileOnly(libs.jaxb.api) + implementation(libs.jooq) + implementation(libs.assertj.core) + implementation(libs.testcontainers.postgresql) + implementation(libs.postgresql) + implementation(libs.hikaricp) + runtimeOnly(libs.logback.classic) + } + } + } +} + +tasks.named("test") { dependsOn("springBoot4Test") } + +val publishingToMavenCentral = + gradle.startParameter.taskNames.any { it.contains("publishToMavenCentral") } + +mavenPublishing { + publishToMavenCentral(automaticRelease = true, validateDeployment = DeploymentValidation.NONE) + if (publishingToMavenCentral) { + signAllPublications() + } + + pom { + name.set("DBOS Transact Spring Transactional Step Starter") + description.set("Spring Boot auto-configuration for DBOS @TransactionalStep annotation") + inceptionYear.set("2025") + url.set("https://github.com/dbos-inc/dbos-transact-java") + + licenses { + license { + name.set("MIT License") + url.set("https://opensource.org/licenses/MIT") + } + } + + developers { + developer { + id.set("dbos-inc") + name.set("DBOS Inc") + email.set("support@dbos.dev") + } + } + + scm { + connection.set("scm:git:git://github.com/dbos-inc/dbos-transact-java.git") + developerConnection.set("scm:git:ssh://github.com:dbos-inc/dbos-transact-java.git") + url.set("https://github.com/dbos-inc/dbos-transact-java/tree/main") + } + } +} diff --git a/transact-spring-txstep-starter/src/main/java/dev/dbos/transact/spring/txstep/TransactionalStep.java b/transact-spring-txstep-starter/src/main/java/dev/dbos/transact/spring/txstep/TransactionalStep.java new file mode 100644 index 00000000..197f2660 --- /dev/null +++ b/transact-spring-txstep-starter/src/main/java/dev/dbos/transact/spring/txstep/TransactionalStep.java @@ -0,0 +1,34 @@ +package dev.dbos.transact.spring.txstep; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Marks a Spring-managed method as an idempotent transactional step that integrates with the DBOS + * runtime. + * + *

When called inside a {@link dev.dbos.transact.workflow.Workflow @Workflow}, the method body + * runs inside a {@code REQUIRES_NEW} Spring transaction. The step output is written to the {@code + * tx_step_outputs} table atomically with the user's database work. On workflow retry, the recorded + * output is replayed without re-executing the method body. + * + *

The annotated method must be called through a Spring proxy (e.g. a self-injected reference) + * for the aspect to intercept it. + * + *

Example: + * + *

{@code
+ * @TransactionalStep
+ * public Order saveOrder(Order order) {
+ *     return repository.save(order);
+ * }
+ * }
+ */ +@Target(ElementType.METHOD) +@Retention(RetentionPolicy.RUNTIME) +public @interface TransactionalStep { + /** Stable name for this step within the workflow. Defaults to the method name when left empty. */ + String name() default ""; +} diff --git a/transact-spring-txstep-starter/src/main/java/dev/dbos/transact/spring/txstep/TransactionalStepAspect.java b/transact-spring-txstep-starter/src/main/java/dev/dbos/transact/spring/txstep/TransactionalStepAspect.java new file mode 100644 index 00000000..45058fae --- /dev/null +++ b/transact-spring-txstep-starter/src/main/java/dev/dbos/transact/spring/txstep/TransactionalStepAspect.java @@ -0,0 +1,49 @@ +package dev.dbos.transact.spring.txstep; + +import dev.dbos.transact.spring.WrappedThrowableException; + +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.annotation.Around; +import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.reflect.MethodSignature; + +/** + * AOP aspect that intercepts {@link TransactionalStep @TransactionalStep} annotated methods and + * delegates execution to {@link TransactionalStepFactory}. + * + *

This bean is registered by {@link TransactionalStepAutoConfiguration}. + */ +@Aspect +public class TransactionalStepAspect { + + private final TransactionalStepFactory factory; + + public TransactionalStepAspect(TransactionalStepFactory factory) { + this.factory = factory; + } + + @Around("@annotation(transactionalStep)") + public Object aroundTransactionalStep( + ProceedingJoinPoint pjp, TransactionalStep transactionalStep) throws Throwable { + String stepName = transactionalStep.name(); + if (stepName.isEmpty()) { + stepName = ((MethodSignature) pjp.getSignature()).getName(); + } + String resolvedName = stepName; + try { + return factory.runTransactionalStep( + () -> { + try { + return pjp.proceed(); + } catch (Exception e) { + throw e; + } catch (Throwable t) { + throw new WrappedThrowableException(t); + } + }, + resolvedName); + } catch (WrappedThrowableException e) { + throw e.getWrappedThrowable(); + } + } +} diff --git a/transact-spring-txstep-starter/src/main/java/dev/dbos/transact/spring/txstep/TransactionalStepAutoConfiguration.java b/transact-spring-txstep-starter/src/main/java/dev/dbos/transact/spring/txstep/TransactionalStepAutoConfiguration.java new file mode 100644 index 00000000..2ee8135c --- /dev/null +++ b/transact-spring-txstep-starter/src/main/java/dev/dbos/transact/spring/txstep/TransactionalStepAutoConfiguration.java @@ -0,0 +1,64 @@ +package dev.dbos.transact.spring.txstep; + +import dev.dbos.transact.DBOS; +import dev.dbos.transact.spring.DBOSAutoConfiguration; + +import javax.sql.DataSource; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.transaction.PlatformTransactionManager; + +/** + * Spring Boot auto-configuration for {@link TransactionalStep @TransactionalStep} support. + * + *

Activates when a {@link DBOS} bean, a {@link PlatformTransactionManager} bean, and a {@link + * DataSource} bean are all present. Creates {@link TransactionalStepFactory}, {@link + * TransactionalStepAspect}, and {@link TransactionalStepRegistrar} beans. + */ +@AutoConfiguration( + after = DBOSAutoConfiguration.class, + afterName = { + // Ensure the JDBC and JPA transaction manager auto-configurations have registered their + // PlatformTransactionManager bean definitions before our @ConditionalOnBean check runs. + // Spring Boot 3.x class names: + "org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration", + "org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration", + // Spring Boot 4.x class names (auto-configurations moved to dedicated modules): + "org.springframework.boot.jdbc.autoconfigure.DataSourceTransactionManagerAutoConfiguration", + "org.springframework.boot.hibernate.autoconfigure.HibernateJpaAutoConfiguration" + }) +@ConditionalOnBean({DBOS.class, PlatformTransactionManager.class, DataSource.class}) +@EnableConfigurationProperties(TransactionalStepProperties.class) +public class TransactionalStepAutoConfiguration { + + private static final Logger logger = + LoggerFactory.getLogger(TransactionalStepAutoConfiguration.class); + + @Bean + public TransactionalStepFactory springTransactionalStepFactory( + DBOS dbos, + DataSource dataSource, + PlatformTransactionManager txManager, + TransactionalStepProperties properties) { + logger.info( + "TransactionalStepAutoConfiguration activated; txManager={} dataSource={}", + txManager.getClass().getName(), + dataSource.getClass().getName()); + return new TransactionalStepFactory(dbos, dataSource, txManager, properties.getSchema()); + } + + @Bean + public TransactionalStepAspect transactionalStepAspect(TransactionalStepFactory factory) { + return new TransactionalStepAspect(factory); + } + + @Bean + public TransactionalStepRegistrar transactionalStepRegistrar(TransactionalStepFactory factory) { + return new TransactionalStepRegistrar(factory); + } +} diff --git a/transact-spring-txstep-starter/src/main/java/dev/dbos/transact/spring/txstep/TransactionalStepFactory.java b/transact-spring-txstep-starter/src/main/java/dev/dbos/transact/spring/txstep/TransactionalStepFactory.java new file mode 100644 index 00000000..b36112d3 --- /dev/null +++ b/transact-spring-txstep-starter/src/main/java/dev/dbos/transact/spring/txstep/TransactionalStepFactory.java @@ -0,0 +1,199 @@ +package dev.dbos.transact.spring.txstep; + +import static org.springframework.transaction.TransactionDefinition.PROPAGATION_REQUIRED; +import static org.springframework.transaction.TransactionDefinition.PROPAGATION_REQUIRES_NEW; + +import dev.dbos.transact.DBOS; +import dev.dbos.transact.database.SystemDatabase; +import dev.dbos.transact.execution.ThrowingSupplier; +import dev.dbos.transact.json.DBOSSerializer; +import dev.dbos.transact.json.SerializationUtil; +import dev.dbos.transact.txstep.PostgresStepFactory; +import dev.dbos.transact.txstep.TxStepSchema; +import dev.dbos.transact.workflow.internal.StepResult; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Objects; +import java.util.Optional; + +import javax.sql.DataSource; + +import org.springframework.jdbc.datasource.DataSourceUtils; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.DefaultTransactionDefinition; + +/** + * A step factory that integrates with Spring's {@link PlatformTransactionManager} to provide + * exactly-once transactional steps via the {@code @TransactionalStep} annotation. + * + *

Each step runs inside a {@code REQUIRES_NEW} Spring transaction. The step output is written to + * {@code tx_step_outputs} atomically with the user's database work, using the transaction-bound + * connection obtained from {@link DataSourceUtils#getConnection(DataSource)}. + * + *

This class is created and managed by {@code TransactionalStepAutoConfiguration}. Call {@link + * #initialize()} before processing any steps (the registrar does this automatically when it detects + * annotated methods). + */ +public class TransactionalStepFactory { + + private final DBOS dbos; + private final DataSource dataSource; + private final PlatformTransactionManager txManager; + private final String schema; + private final DBOSSerializer serializer; + + public TransactionalStepFactory( + DBOS dbos, DataSource dataSource, PlatformTransactionManager txManager, String schema) { + this.dbos = Objects.requireNonNull(dbos); + this.dataSource = Objects.requireNonNull(dataSource); + this.txManager = Objects.requireNonNull(txManager); + var config = dbos.integration().config(); + this.schema = SystemDatabase.sanitizeSchema(schema != null ? schema : config.databaseSchema()); + this.serializer = config.serializer(); + } + + /** + * Verifies the datasource is PostgreSQL and creates the {@code tx_step_outputs} table if it does + * not already exist. Called lazily by {@code TransactionalStepRegistrar} only when annotated + * methods are found — avoids any DB contact for applications that never use this starter. + */ + public void initialize() { + try (var conn = dataSource.getConnection()) { + TxStepSchema.verifyPostgres(conn); + TxStepSchema.createTable(conn, schema); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + private Optional checkExecution(String workflowId, int stepId, String stepName) { + try (var conn = dataSource.getConnection(); + var stmt = conn.prepareStatement(TxStepSchema.checkSql(schema))) { + stmt.setString(1, workflowId); + stmt.setInt(2, stepId); + try (var rs = stmt.executeQuery()) { + return TxStepSchema.readResult(rs, workflowId, stepId, stepName); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + private void recordOutput(Connection conn, String workflowId, int stepId, Object result) { + var value = SerializationUtil.serializeValue(result, null, serializer); + recordResult(conn, workflowId, stepId, value.serializedValue(), null, value.serialization()); + } + + private void recordError(String workflowId, int stepId, Exception exception) { + var value = SerializationUtil.serializeError(exception, null, serializer); + try (var conn = dataSource.getConnection()) { + conn.setAutoCommit(false); + try { + recordResult( + conn, workflowId, stepId, null, value.serializedValue(), value.serialization()); + conn.commit(); + } catch (PostgresStepFactory.StepConflictException ignored) { + conn.rollback(); + } catch (SQLException ex) { + conn.rollback(); + throw ex; + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + private void recordResult( + Connection conn, + String workflowId, + int stepId, + String output, + String error, + String serialization) { + try (var stmt = conn.prepareStatement(TxStepSchema.upsertSql(schema))) { + stmt.setString(1, workflowId); + stmt.setInt(2, stepId); + stmt.setString(3, output); + stmt.setString(4, error); + stmt.setString(5, serialization); + stmt.executeUpdate(); + } catch (SQLException e) { + if (PostgresStepFactory.isUniqueViolation(e)) + throw new PostgresStepFactory.StepConflictException(e); + throw new RuntimeException(e); + } + } + + /** + * Runs {@code supplier} as an idempotent DBOS step inside a {@code REQUIRES_NEW} Spring + * transaction. Called by the {@code TransactionalStepAspect}. + * + *

The step output is written to {@code tx_step_outputs} atomically with the transaction + * started for the user's work. On failure the transaction is rolled back and the error is + * recorded in a separate connection so retries can replay the error without re-executing. + * + * @param supplier the step body; typically {@code () -> pjp.proceed()} from the aspect + * @param stepName stable name for the step within the workflow + */ + @SuppressWarnings("unchecked") + public Object runTransactionalStep( + ThrowingSupplier supplier, String stepName) throws E { + // If a @TransactionalStep method is called outside a workflow or inside a step, execute the + // supplier as a standard @Transactional method without any of the durable execution bookkeeping + if (!DBOS.inWorkflow() || DBOS.inStep()) { + var txDef = new DefaultTransactionDefinition(PROPAGATION_REQUIRED); + TransactionStatus status = txManager.getTransaction(txDef); + try { + Object result = supplier.execute(); + txManager.commit(status); + return result; + } catch (RuntimeException | Error e) { + // commit() can throw (e.g. JPA flush-on-commit) and Spring internally rolls back, marking + // status completed — guard to avoid IllegalTransactionStateException on second rollback + if (!status.isCompleted()) txManager.rollback(status); + throw e; + } catch (Exception e) { + if (!status.isCompleted()) txManager.commit(status); + throw (E) e; + } + } + + return dbos.runStep( + () -> { + var workflowId = Objects.requireNonNull(DBOS.workflowId()); + int stepId = Objects.requireNonNull(DBOS.stepId()); + + var prev = checkExecution(workflowId, stepId, stepName); + if (prev.isPresent()) { + return prev.get().toResult(serializer); + } + + var txDef = new DefaultTransactionDefinition(PROPAGATION_REQUIRES_NEW); + TransactionStatus status = txManager.getTransaction(txDef); + try { + Object result = supplier.execute(); + Connection conn = DataSourceUtils.getConnection(dataSource); + recordOutput(conn, workflowId, stepId, result); + txManager.commit(status); + return result; + } catch (PostgresStepFactory.StepConflictException conflict) { + txManager.rollback(status); + return checkExecution(workflowId, stepId, stepName) + .orElseThrow( + () -> + new IllegalStateException( + "Conflict on tx_step_outputs but winner row not found: workflowId=%s stepId=%d stepName=%s" + .formatted(workflowId, stepId, stepName), + conflict)) + .toResult(serializer); + } catch (Exception e) { + if (!status.isCompleted()) txManager.rollback(status); + recordError(workflowId, stepId, e); + throw (E) e; + } + }, + stepName); + } +} diff --git a/transact-spring-txstep-starter/src/main/java/dev/dbos/transact/spring/txstep/TransactionalStepProperties.java b/transact-spring-txstep-starter/src/main/java/dev/dbos/transact/spring/txstep/TransactionalStepProperties.java new file mode 100644 index 00000000..bf5df10c --- /dev/null +++ b/transact-spring-txstep-starter/src/main/java/dev/dbos/transact/spring/txstep/TransactionalStepProperties.java @@ -0,0 +1,22 @@ +package dev.dbos.transact.spring.txstep; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** Configuration properties for {@link TransactionalStep @TransactionalStep} support. */ +@ConfigurationProperties(prefix = "dbos.txstep") +public class TransactionalStepProperties { + + /** + * Schema for the {@code tx_step_outputs} table. When not set, falls back to the DBOS system + * database schema ({@code dbos.datasource.schema}, or {@code dbos} if that is also unset). + */ + private String schema; + + public String getSchema() { + return schema; + } + + public void setSchema(String schema) { + this.schema = schema; + } +} diff --git a/transact-spring-txstep-starter/src/main/java/dev/dbos/transact/spring/txstep/TransactionalStepRegistrar.java b/transact-spring-txstep-starter/src/main/java/dev/dbos/transact/spring/txstep/TransactionalStepRegistrar.java new file mode 100644 index 00000000..e8531642 --- /dev/null +++ b/transact-spring-txstep-starter/src/main/java/dev/dbos/transact/spring/txstep/TransactionalStepRegistrar.java @@ -0,0 +1,94 @@ +package dev.dbos.transact.spring.txstep; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.aop.support.AopUtils; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.BeanFactoryAware; +import org.springframework.beans.factory.SmartInitializingSingleton; +import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; +import org.springframework.core.MethodIntrospector; +import org.springframework.core.annotation.AnnotatedElementUtils; + +/** + * Scans all Spring beans after singleton initialization for {@link + * TransactionalStep @TransactionalStep} annotated methods. If at least one is found, calls {@link + * TransactionalStepFactory#initialize()} to verify PostgreSQL and create the {@code + * tx_step_outputs} table. If none are found, no database contact occurs. + * + *

Bean types are resolved without triggering instantiation: already-created singletons are + * looked up directly from the singleton registry; beans not yet instantiated (e.g. lazy beans) are + * inspected via {@link ConfigurableListableBeanFactory#getType(String, boolean)} with {@code + * allowFactoryBeanInit=false}. + * + *

This bean is registered by {@link TransactionalStepAutoConfiguration}. + */ +public class TransactionalStepRegistrar implements SmartInitializingSingleton, BeanFactoryAware { + + private static final Logger logger = LoggerFactory.getLogger(TransactionalStepRegistrar.class); + + private final TransactionalStepFactory factory; + private ConfigurableListableBeanFactory beanFactory; + + public TransactionalStepRegistrar(TransactionalStepFactory factory) { + this.factory = factory; + } + + @Override + public void setBeanFactory(BeanFactory beanFactory) throws BeansException { + if (!(beanFactory instanceof ConfigurableListableBeanFactory clbf)) { + throw new IllegalArgumentException( + "TransactionalStepRegistrar requires a ConfigurableListableBeanFactory"); + } + this.beanFactory = clbf; + } + + @Override + public void afterSingletonsInstantiated() { + String[] beanNames = beanFactory.getBeanDefinitionNames(); + logger.debug("Scanning {} bean definitions for @TransactionalStep methods", beanNames.length); + for (String beanName : beanNames) { + Class typeToCheck; + + // If the singleton is already created, use its actual class (unwrapping any AOP proxy). + // This avoids re-instantiation and correctly handles both CGLIB and JDK dynamic proxies. + Object existing = beanFactory.getSingleton(beanName); + if (existing != null) { + typeToCheck = AopUtils.getTargetClass(existing); + } else { + // Bean not yet instantiated (e.g. lazy). Resolve from the bean definition without + // creating the bean; null means the type cannot be determined statically — skip it. + typeToCheck = beanFactory.getType(beanName, false); + if (typeToCheck == null) { + logger.debug("Skipping bean '{}': type could not be determined statically", beanName); + continue; + } + } + + if (typeToCheck != null && hasTransactionalStep(typeToCheck)) { + logger.info( + "Found @TransactionalStep in bean '{}' ({}); initializing tx_step_outputs", + beanName, + typeToCheck.getName()); + factory.initialize(); + return; + } + } + logger.warn( + "No @TransactionalStep methods found in any of {} bean definitions; " + + "tx_step_outputs table will NOT be created", + beanNames.length); + } + + private static boolean hasTransactionalStep(Class targetClass) { + return !MethodIntrospector.selectMethods( + targetClass, + (MethodIntrospector.MetadataLookup) + method -> + AnnotatedElementUtils.hasAnnotation(method, TransactionalStep.class) + ? Boolean.TRUE + : null) + .isEmpty(); + } +} diff --git a/transact-spring-txstep-starter/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/transact-spring-txstep-starter/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 00000000..89f860df --- /dev/null +++ b/transact-spring-txstep-starter/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1 @@ +dev.dbos.transact.spring.txstep.TransactionalStepAutoConfiguration diff --git a/transact-spring-txstep-starter/src/test/java/dev/dbos/transact/spring/txstep/TransactionalStepJdbcIntegrationTest.java b/transact-spring-txstep-starter/src/test/java/dev/dbos/transact/spring/txstep/TransactionalStepJdbcIntegrationTest.java new file mode 100644 index 00000000..99e26f0b --- /dev/null +++ b/transact-spring-txstep-starter/src/test/java/dev/dbos/transact/spring/txstep/TransactionalStepJdbcIntegrationTest.java @@ -0,0 +1,339 @@ +package dev.dbos.transact.spring.txstep; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import dev.dbos.transact.DBOS; +import dev.dbos.transact.context.WorkflowOptions; +import dev.dbos.transact.database.SystemDatabase; +import dev.dbos.transact.spring.DBOSAutoConfiguration; +import dev.dbos.transact.workflow.Workflow; + +import java.sql.SQLException; + +import javax.sql.DataSource; + +import org.junit.jupiter.api.Test; +import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.transaction.PlatformTransactionManager; + +public class TransactionalStepJdbcIntegrationTest { + + record Order(String id, String item, int qty) {} + + public static class OrderStepService { + private final JdbcTemplate jdbc; + + OrderStepService(JdbcTemplate jdbc) { + this.jdbc = jdbc; + } + + @TransactionalStep + public Order placeOrder(String orderId, String item, int qty) { + jdbc.update("INSERT INTO orders(id, item, qty) VALUES (?, ?, ?)", orderId, item, qty); + return new Order(orderId, item, qty); + } + + @TransactionalStep + public Order doError(String orderId, String item, int qty) { + jdbc.update("INSERT INTO orders(id, item, qty) VALUES (?, ?, ?)", orderId, item, qty); + throw new RuntimeException("intentional failure"); + } + } + + // Outer @TransactionalStep that calls an inner @TransactionalStep via proxy + public static class OrderOuterStepService { + private final JdbcTemplate jdbc; + private final OrderStepService inner; + + OrderOuterStepService(JdbcTemplate jdbc, OrderStepService inner) { + this.jdbc = jdbc; + this.inner = inner; + } + + @TransactionalStep + public Order placeOrderWithNested( + String outerOrderId, String innerOrderId, String item, int qty) { + jdbc.update("INSERT INTO orders(id, item, qty) VALUES (?, ?, ?)", outerOrderId, item, qty); + return inner.placeOrder(innerOrderId, item, qty); + } + } + + public static class OrderWorkflowService { + private final DBOS dbos; + private final OrderStepService steps; + private final OrderOuterStepService outerSteps; + + OrderWorkflowService(DBOS dbos, OrderStepService steps, OrderOuterStepService outerSteps) { + this.dbos = dbos; + this.steps = steps; + this.outerSteps = outerSteps; + } + + @Workflow + public Order processOrder(String orderId, String item, int qty) { + return steps.placeOrder(orderId, item, qty); + } + + @Workflow + public Order triggerError(String orderId, String item, int qty) { + return steps.doError(orderId, item, qty); + } + + @Workflow + public Order processOrderWithNested( + String outerOrderId, String innerOrderId, String item, int qty) { + return outerSteps.placeOrderWithNested(outerOrderId, innerOrderId, item, qty); + } + + @Workflow + public Order processOrderViaDbosStep(String orderId, String item, int qty) { + return dbos.runStep(() -> steps.placeOrder(orderId, item, qty), "processOrderViaDbosStep"); + } + } + + @Configuration(proxyBeanMethods = false) + static class OrderConfig { + @Bean + OrderStepService orderSteps(JdbcTemplate jdbc) { + return new OrderStepService(jdbc); + } + + @Bean + OrderOuterStepService orderOuterSteps(JdbcTemplate jdbc, OrderStepService steps) { + return new OrderOuterStepService(jdbc, steps); + } + + @Bean + OrderWorkflowService orderWorkflow( + DBOS dbos, OrderStepService steps, OrderOuterStepService outerSteps) { + return new OrderWorkflowService(dbos, steps, outerSteps); + } + } + + private static ApplicationContextRunner runner(TransactionalStepTest.TestDatabase db) { + new JdbcTemplate(db.dataSource) + .execute( + "CREATE TABLE IF NOT EXISTS orders" + + " (id TEXT PRIMARY KEY, item TEXT NOT NULL, qty INT NOT NULL)"); + return new ApplicationContextRunner() + .withConfiguration( + AutoConfigurations.of( + DBOSAutoConfiguration.class, TransactionalStepAutoConfiguration.class)) + .withPropertyValues("dbos.application.name=txstep-jdbc-test") + .withBean("dataSource", DataSource.class, () -> db.dataSource) + .withBean( + "transactionManager", + PlatformTransactionManager.class, + () -> new DataSourceTransactionManager(db.dataSource)) + .withBean("jdbcTemplate", JdbcTemplate.class, () -> new JdbcTemplate(db.dataSource)) + .withUserConfiguration(OrderConfig.class); + } + + private static int orderCount(DataSource ds, String orderId) throws SQLException { + try (var conn = ds.getConnection(); + var stmt = conn.prepareStatement("SELECT COUNT(*) FROM orders WHERE id = ?")) { + stmt.setString(1, orderId); + try (var rs = stmt.executeQuery()) { + return rs.next() ? rs.getInt(1) : 0; + } + } + } + + @Test + void autoConfig_createsExpectedBeans() { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + assertThat(ctx).hasSingleBean(DBOS.class); + assertThat(ctx).hasSingleBean(TransactionalStepFactory.class); + assertThat(ctx).hasSingleBean(TransactionalStepAspect.class); + assertThat(ctx).hasSingleBean(TransactionalStepRegistrar.class); + }); + } + } + + @Test + void goldenPath() throws SQLException { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + var workflow = ctx.getBean(OrderWorkflowService.class); + var wfid = "wf-jdbc-int-golden"; + + try (var _o = new WorkflowOptions(wfid).setContext()) { + assertThat(workflow.processOrder("ord-1", "Widget", 5)) + .isEqualTo(new Order("ord-1", "Widget", 5)); + } + + assertThat(orderCount(db.dataSource, "ord-1")).isEqualTo(1); + var rows = TransactionalStepTest.getTxRows(db.dataSource, wfid); + assertThat(rows).hasSize(1); + assertThat(rows.get(0).output()).isNotNull(); + assertThat(rows.get(0).error()).isNull(); + }); + } + } + + @Test + void idempotency() throws SQLException { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + var workflow = ctx.getBean(OrderWorkflowService.class); + var wfid = "wf-jdbc-int-idem"; + + try (var _o = new WorkflowOptions(wfid).setContext()) { + workflow.processOrder("ord-2", "Gadget", 3); + } + try (var _o = new WorkflowOptions(wfid).setContext()) { + assertThat(workflow.processOrder("ord-2", "Gadget", 3)) + .isEqualTo(new Order("ord-2", "Gadget", 3)); + } + + assertThat(orderCount(db.dataSource, "ord-2")).isEqualTo(1); + assertThat(TransactionalStepTest.getTxRows(db.dataSource, wfid)).hasSize(1); + }); + } + } + + @Test + void atomicityOnFailure() throws SQLException { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + var workflow = ctx.getBean(OrderWorkflowService.class); + var wfid = "wf-jdbc-int-fail"; + + try (var _o = new WorkflowOptions(wfid).setContext()) { + assertThatThrownBy(() -> workflow.triggerError("ord-3", "Thing", 1)) + .isInstanceOf(RuntimeException.class); + } + + assertThat(orderCount(db.dataSource, "ord-3")).isEqualTo(0); + var rows = TransactionalStepTest.getTxRows(db.dataSource, wfid); + assertThat(rows).hasSize(1); + assertThat(rows.get(0).output()).isNull(); + assertThat(rows.get(0).error()).isNotNull(); + }); + } + } + + @Test + void customSchema_property_tableCreatedInCustomSchema() throws SQLException { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .withPropertyValues("dbos.txstep.schema=custom_schema") + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + assertThat( + TransactionalStepTest.tableExists( + db.dataSource, "custom_schema", "tx_step_outputs")) + .isTrue(); + assertThat( + TransactionalStepTest.tableExists( + db.dataSource, SystemDatabase.sanitizeSchema(null), "tx_step_outputs")) + .isFalse(); + }); + } + } + + @Test + void outsideWorkflow_stepRunsAndCommits() throws SQLException { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + var steps = ctx.getBean(OrderStepService.class); + + assertThat(steps.placeOrder("ord-out-1", "Widget", 5)) + .isEqualTo(new Order("ord-out-1", "Widget", 5)); + + assertThat(orderCount(db.dataSource, "ord-out-1")).isEqualTo(1); + assertThat(TransactionalStepTest.totalTxRows(db.dataSource)).isEqualTo(0); + }); + } + } + + @Test + void outsideWorkflow_runtimeException_rollsBack() throws SQLException { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + var steps = ctx.getBean(OrderStepService.class); + + assertThatThrownBy(() -> steps.doError("ord-out-2", "Gadget", 1)) + .isInstanceOf(RuntimeException.class); + + assertThat(orderCount(db.dataSource, "ord-out-2")).isEqualTo(0); + assertThat(TransactionalStepTest.totalTxRows(db.dataSource)).isEqualTo(0); + }); + } + } + + @Test + void nestedTxStep_innerJoinsOuterTransaction() throws SQLException { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + var workflow = ctx.getBean(OrderWorkflowService.class); + var wfid = "wf-jdbc-nested"; + + try (var _o = new WorkflowOptions(wfid).setContext()) { + assertThat( + workflow.processOrderWithNested("ord-outer", "ord-inner", "Sprocket", 2)) + .isEqualTo(new Order("ord-inner", "Sprocket", 2)); + } + + assertThat(orderCount(db.dataSource, "ord-outer")).isEqualTo(1); + assertThat(orderCount(db.dataSource, "ord-inner")).isEqualTo(1); + // Only the outer step writes to tx_step_outputs; inner runs in passthrough mode + var rows = TransactionalStepTest.getTxRows(db.dataSource, wfid); + assertThat(rows).hasSize(1); + assertThat(rows.get(0).output()).isNotNull(); + assertThat(rows.get(0).error()).isNull(); + }); + } + } + + @Test + void insideDbosStep_innerJoinsOuter() throws SQLException { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + var workflow = ctx.getBean(OrderWorkflowService.class); + var wfid = "wf-jdbc-dbosstep"; + + try (var _o = new WorkflowOptions(wfid).setContext()) { + assertThat(workflow.processOrderViaDbosStep("ord-dbos", "Thing", 3)) + .isEqualTo(new Order("ord-dbos", "Thing", 3)); + } + + assertThat(orderCount(db.dataSource, "ord-dbos")).isEqualTo(1); + // Inner @TransactionalStep ran in passthrough mode — no tx_step_outputs entry + assertThat(TransactionalStepTest.getTxRows(db.dataSource, wfid)).isEmpty(); + }); + } + } +} diff --git a/transact-spring-txstep-starter/src/test/java/dev/dbos/transact/spring/txstep/TransactionalStepJdbiIntegrationTest.java b/transact-spring-txstep-starter/src/test/java/dev/dbos/transact/spring/txstep/TransactionalStepJdbiIntegrationTest.java new file mode 100644 index 00000000..6d651347 --- /dev/null +++ b/transact-spring-txstep-starter/src/test/java/dev/dbos/transact/spring/txstep/TransactionalStepJdbiIntegrationTest.java @@ -0,0 +1,353 @@ +package dev.dbos.transact.spring.txstep; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import dev.dbos.transact.DBOS; +import dev.dbos.transact.context.WorkflowOptions; +import dev.dbos.transact.database.SystemDatabase; +import dev.dbos.transact.spring.DBOSAutoConfiguration; +import dev.dbos.transact.workflow.Workflow; + +import java.sql.SQLException; + +import javax.sql.DataSource; + +import org.jdbi.v3.core.Jdbi; +import org.jdbi.v3.spring.JdbiFactoryBean; +import org.junit.jupiter.api.Test; +import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; + +public class TransactionalStepJdbiIntegrationTest { + + record Order(String id, String item, int qty) {} + + public static class OrderStepService { + private final Jdbi jdbi; + + OrderStepService(Jdbi jdbi) { + this.jdbi = jdbi; + } + + @TransactionalStep + public Order placeOrder(String orderId, String item, int qty) { + jdbi.withHandle( + h -> h.execute("INSERT INTO orders(id, item, qty) VALUES (?, ?, ?)", orderId, item, qty)); + return new Order(orderId, item, qty); + } + + @TransactionalStep + public Order doError(String orderId, String item, int qty) { + jdbi.withHandle( + h -> h.execute("INSERT INTO orders(id, item, qty) VALUES (?, ?, ?)", orderId, item, qty)); + throw new RuntimeException("intentional failure"); + } + } + + // Outer @TransactionalStep that calls an inner @TransactionalStep via proxy + public static class OrderOuterStepService { + private final Jdbi jdbi; + private final OrderStepService inner; + + OrderOuterStepService(Jdbi jdbi, OrderStepService inner) { + this.jdbi = jdbi; + this.inner = inner; + } + + @TransactionalStep + public Order placeOrderWithNested( + String outerOrderId, String innerOrderId, String item, int qty) { + jdbi.withHandle( + h -> + h.execute( + "INSERT INTO orders(id, item, qty) VALUES (?, ?, ?)", outerOrderId, item, qty)); + return inner.placeOrder(innerOrderId, item, qty); + } + } + + public static class OrderWorkflowService { + private final DBOS dbos; + private final OrderStepService steps; + private final OrderOuterStepService outerSteps; + + OrderWorkflowService(DBOS dbos, OrderStepService steps, OrderOuterStepService outerSteps) { + this.dbos = dbos; + this.steps = steps; + this.outerSteps = outerSteps; + } + + @Workflow + public Order processOrder(String orderId, String item, int qty) { + return steps.placeOrder(orderId, item, qty); + } + + @Workflow + public Order triggerError(String orderId, String item, int qty) { + return steps.doError(orderId, item, qty); + } + + @Workflow + public Order processOrderWithNested( + String outerOrderId, String innerOrderId, String item, int qty) { + return outerSteps.placeOrderWithNested(outerOrderId, innerOrderId, item, qty); + } + + @Workflow + public Order processOrderViaDbosStep(String orderId, String item, int qty) { + return dbos.runStep(() -> steps.placeOrder(orderId, item, qty), "processOrderViaDbosStep"); + } + } + + @Configuration(proxyBeanMethods = false) + static class JdbiInfraConfig { + @Bean + DataSourceTransactionManager transactionManager(DataSource dataSource) { + return new DataSourceTransactionManager(dataSource); + } + + @Bean + JdbiFactoryBean jdbi(DataSource dataSource) { + return new JdbiFactoryBean(dataSource); + } + } + + @Configuration(proxyBeanMethods = false) + static class OrderConfig { + @Bean + OrderStepService orderSteps(Jdbi jdbi) { + return new OrderStepService(jdbi); + } + + @Bean + OrderOuterStepService orderOuterSteps(Jdbi jdbi, OrderStepService steps) { + return new OrderOuterStepService(jdbi, steps); + } + + @Bean + OrderWorkflowService orderWorkflow( + DBOS dbos, OrderStepService steps, OrderOuterStepService outerSteps) { + return new OrderWorkflowService(dbos, steps, outerSteps); + } + } + + private static ApplicationContextRunner runner(TransactionalStepTest.TestDatabase db) { + new JdbcTemplate(db.dataSource) + .execute( + "CREATE TABLE IF NOT EXISTS orders" + + " (id TEXT PRIMARY KEY, item TEXT NOT NULL, qty INT NOT NULL)"); + return new ApplicationContextRunner() + .withConfiguration( + AutoConfigurations.of( + DBOSAutoConfiguration.class, TransactionalStepAutoConfiguration.class)) + .withPropertyValues("dbos.application.name=txstep-jdbi-test") + .withBean("dataSource", DataSource.class, () -> db.dataSource) + .withUserConfiguration(JdbiInfraConfig.class, OrderConfig.class); + } + + private static int orderCount(DataSource ds, String orderId) throws SQLException { + try (var conn = ds.getConnection(); + var stmt = conn.prepareStatement("SELECT COUNT(*) FROM orders WHERE id = ?")) { + stmt.setString(1, orderId); + try (var rs = stmt.executeQuery()) { + return rs.next() ? rs.getInt(1) : 0; + } + } + } + + @Test + void autoConfig_createsExpectedBeans() { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + assertThat(ctx).hasSingleBean(DBOS.class); + assertThat(ctx).hasSingleBean(TransactionalStepFactory.class); + assertThat(ctx).hasSingleBean(TransactionalStepAspect.class); + assertThat(ctx).hasSingleBean(TransactionalStepRegistrar.class); + }); + } + } + + @Test + void goldenPath() throws SQLException { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + var workflow = ctx.getBean(OrderWorkflowService.class); + var wfid = "wf-jdbi-int-golden"; + + try (var _o = new WorkflowOptions(wfid).setContext()) { + assertThat(workflow.processOrder("ord-1", "Widget", 5)) + .isEqualTo(new Order("ord-1", "Widget", 5)); + } + + assertThat(orderCount(db.dataSource, "ord-1")).isEqualTo(1); + var rows = TransactionalStepTest.getTxRows(db.dataSource, wfid); + assertThat(rows).hasSize(1); + assertThat(rows.get(0).output()).isNotNull(); + assertThat(rows.get(0).error()).isNull(); + }); + } + } + + @Test + void idempotency() throws SQLException { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + var workflow = ctx.getBean(OrderWorkflowService.class); + var wfid = "wf-jdbi-int-idem"; + + try (var _o = new WorkflowOptions(wfid).setContext()) { + workflow.processOrder("ord-2", "Gadget", 3); + } + try (var _o = new WorkflowOptions(wfid).setContext()) { + assertThat(workflow.processOrder("ord-2", "Gadget", 3)) + .isEqualTo(new Order("ord-2", "Gadget", 3)); + } + + assertThat(orderCount(db.dataSource, "ord-2")).isEqualTo(1); + assertThat(TransactionalStepTest.getTxRows(db.dataSource, wfid)).hasSize(1); + }); + } + } + + @Test + void atomicityOnFailure() throws SQLException { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + var workflow = ctx.getBean(OrderWorkflowService.class); + var wfid = "wf-jdbi-int-fail"; + + try (var _o = new WorkflowOptions(wfid).setContext()) { + assertThatThrownBy(() -> workflow.triggerError("ord-3", "Thing", 1)) + .isInstanceOf(RuntimeException.class); + } + + assertThat(orderCount(db.dataSource, "ord-3")).isEqualTo(0); + var rows = TransactionalStepTest.getTxRows(db.dataSource, wfid); + assertThat(rows).hasSize(1); + assertThat(rows.get(0).output()).isNull(); + assertThat(rows.get(0).error()).isNotNull(); + }); + } + } + + @Test + void customSchema_property_tableCreatedInCustomSchema() throws SQLException { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .withPropertyValues("dbos.txstep.schema=custom_schema") + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + assertThat( + TransactionalStepTest.tableExists( + db.dataSource, "custom_schema", "tx_step_outputs")) + .isTrue(); + assertThat( + TransactionalStepTest.tableExists( + db.dataSource, SystemDatabase.sanitizeSchema(null), "tx_step_outputs")) + .isFalse(); + }); + } + } + + @Test + void outsideWorkflow_stepRunsAndCommits() throws SQLException { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + var steps = ctx.getBean(OrderStepService.class); + + assertThat(steps.placeOrder("ord-out-1", "Widget", 5)) + .isEqualTo(new Order("ord-out-1", "Widget", 5)); + + assertThat(orderCount(db.dataSource, "ord-out-1")).isEqualTo(1); + assertThat(TransactionalStepTest.totalTxRows(db.dataSource)).isEqualTo(0); + }); + } + } + + @Test + void outsideWorkflow_runtimeException_rollsBack() throws SQLException { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + var steps = ctx.getBean(OrderStepService.class); + + assertThatThrownBy(() -> steps.doError("ord-out-2", "Gadget", 1)) + .isInstanceOf(RuntimeException.class); + + assertThat(orderCount(db.dataSource, "ord-out-2")).isEqualTo(0); + assertThat(TransactionalStepTest.totalTxRows(db.dataSource)).isEqualTo(0); + }); + } + } + + @Test + void nestedTxStep_innerJoinsOuterTransaction() throws SQLException { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + var workflow = ctx.getBean(OrderWorkflowService.class); + var wfid = "wf-jdbi-nested"; + + try (var _o = new WorkflowOptions(wfid).setContext()) { + assertThat( + workflow.processOrderWithNested("ord-outer", "ord-inner", "Sprocket", 2)) + .isEqualTo(new Order("ord-inner", "Sprocket", 2)); + } + + assertThat(orderCount(db.dataSource, "ord-outer")).isEqualTo(1); + assertThat(orderCount(db.dataSource, "ord-inner")).isEqualTo(1); + // Only the outer step writes to tx_step_outputs; inner runs in passthrough mode + var rows = TransactionalStepTest.getTxRows(db.dataSource, wfid); + assertThat(rows).hasSize(1); + assertThat(rows.get(0).output()).isNotNull(); + assertThat(rows.get(0).error()).isNull(); + }); + } + } + + @Test + void insideDbosStep_innerJoinsOuter() throws SQLException { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + var workflow = ctx.getBean(OrderWorkflowService.class); + var wfid = "wf-jdbi-dbosstep"; + + try (var _o = new WorkflowOptions(wfid).setContext()) { + assertThat(workflow.processOrderViaDbosStep("ord-dbos", "Thing", 3)) + .isEqualTo(new Order("ord-dbos", "Thing", 3)); + } + + assertThat(orderCount(db.dataSource, "ord-dbos")).isEqualTo(1); + // Inner @TransactionalStep ran in passthrough mode — no tx_step_outputs entry + assertThat(TransactionalStepTest.getTxRows(db.dataSource, wfid)).isEmpty(); + }); + } + } +} diff --git a/transact-spring-txstep-starter/src/test/java/dev/dbos/transact/spring/txstep/TransactionalStepJooqIntegrationTest.java b/transact-spring-txstep-starter/src/test/java/dev/dbos/transact/spring/txstep/TransactionalStepJooqIntegrationTest.java new file mode 100644 index 00000000..57aa19f9 --- /dev/null +++ b/transact-spring-txstep-starter/src/test/java/dev/dbos/transact/spring/txstep/TransactionalStepJooqIntegrationTest.java @@ -0,0 +1,357 @@ +package dev.dbos.transact.spring.txstep; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import dev.dbos.transact.DBOS; +import dev.dbos.transact.context.WorkflowOptions; +import dev.dbos.transact.database.SystemDatabase; +import dev.dbos.transact.spring.DBOSAutoConfiguration; +import dev.dbos.transact.workflow.Workflow; + +import java.sql.SQLException; + +import javax.sql.DataSource; + +import org.jooq.DSLContext; +import org.jooq.SQLDialect; +import org.jooq.impl.DSL; +import org.jooq.impl.DataSourceConnectionProvider; +import org.jooq.impl.DefaultConfiguration; +import org.junit.jupiter.api.Test; +import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.jdbc.datasource.TransactionAwareDataSourceProxy; +import org.springframework.transaction.PlatformTransactionManager; + +public class TransactionalStepJooqIntegrationTest { + + record Order(String id, String item, int qty) {} + + public static class OrderStepService { + private final DSLContext dsl; + + OrderStepService(DSLContext dsl) { + this.dsl = dsl; + } + + @TransactionalStep + public Order placeOrder(String orderId, String item, int qty) { + dsl.execute("INSERT INTO orders(id, item, qty) VALUES (?, ?, ?)", orderId, item, qty); + return new Order(orderId, item, qty); + } + + @TransactionalStep + public Order doError(String orderId, String item, int qty) { + dsl.execute("INSERT INTO orders(id, item, qty) VALUES (?, ?, ?)", orderId, item, qty); + throw new RuntimeException("intentional failure"); + } + } + + // Outer @TransactionalStep that calls an inner @TransactionalStep via proxy + public static class OrderOuterStepService { + private final DSLContext dsl; + private final OrderStepService inner; + + OrderOuterStepService(DSLContext dsl, OrderStepService inner) { + this.dsl = dsl; + this.inner = inner; + } + + @TransactionalStep + public Order placeOrderWithNested( + String outerOrderId, String innerOrderId, String item, int qty) { + dsl.execute("INSERT INTO orders(id, item, qty) VALUES (?, ?, ?)", outerOrderId, item, qty); + return inner.placeOrder(innerOrderId, item, qty); + } + } + + public static class OrderWorkflowService { + private final DBOS dbos; + private final OrderStepService steps; + private final OrderOuterStepService outerSteps; + + OrderWorkflowService(DBOS dbos, OrderStepService steps, OrderOuterStepService outerSteps) { + this.dbos = dbos; + this.steps = steps; + this.outerSteps = outerSteps; + } + + @Workflow + public Order processOrder(String orderId, String item, int qty) { + return steps.placeOrder(orderId, item, qty); + } + + @Workflow + public Order triggerError(String orderId, String item, int qty) { + return steps.doError(orderId, item, qty); + } + + @Workflow + public Order processOrderWithNested( + String outerOrderId, String innerOrderId, String item, int qty) { + return outerSteps.placeOrderWithNested(outerOrderId, innerOrderId, item, qty); + } + + @Workflow + public Order processOrderViaDbosStep(String orderId, String item, int qty) { + return dbos.runStep(() -> steps.placeOrder(orderId, item, qty), "processOrderViaDbosStep"); + } + } + + @Configuration(proxyBeanMethods = false) + static class OrderConfig { + @Bean + OrderStepService orderSteps(DSLContext dsl) { + return new OrderStepService(dsl); + } + + @Bean + OrderOuterStepService orderOuterSteps(DSLContext dsl, OrderStepService steps) { + return new OrderOuterStepService(dsl, steps); + } + + @Bean + OrderWorkflowService orderWorkflow( + DBOS dbos, OrderStepService steps, OrderOuterStepService outerSteps) { + return new OrderWorkflowService(dbos, steps, outerSteps); + } + } + + /** + * Builds a DSLContext backed by a TransactionAwareDataSourceProxy so that JOOQ operations execute + * on the Spring-transaction-bound connection inside {@code @TransactionalStep} methods. + */ + private static DSLContext buildDsl(DataSource dataSource) { + var config = + new DefaultConfiguration() + .set(new DataSourceConnectionProvider(new TransactionAwareDataSourceProxy(dataSource))) + .set(SQLDialect.POSTGRES); + return DSL.using(config); + } + + private static ApplicationContextRunner runner(TransactionalStepTest.TestDatabase db) { + new JdbcTemplate(db.dataSource) + .execute( + "CREATE TABLE IF NOT EXISTS orders" + + " (id TEXT PRIMARY KEY, item TEXT NOT NULL, qty INT NOT NULL)"); + return new ApplicationContextRunner() + .withConfiguration( + AutoConfigurations.of( + DBOSAutoConfiguration.class, TransactionalStepAutoConfiguration.class)) + .withPropertyValues("dbos.application.name=txstep-jooq-test") + .withBean("dataSource", DataSource.class, () -> db.dataSource) + .withBean( + "transactionManager", + PlatformTransactionManager.class, + () -> new DataSourceTransactionManager(db.dataSource)) + .withBean("dslContext", DSLContext.class, () -> buildDsl(db.dataSource)) + .withUserConfiguration(OrderConfig.class); + } + + private static int orderCount(DataSource ds, String orderId) throws SQLException { + try (var conn = ds.getConnection(); + var stmt = conn.prepareStatement("SELECT COUNT(*) FROM orders WHERE id = ?")) { + stmt.setString(1, orderId); + try (var rs = stmt.executeQuery()) { + return rs.next() ? rs.getInt(1) : 0; + } + } + } + + @Test + void autoConfig_createsExpectedBeans() { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + assertThat(ctx).hasSingleBean(DBOS.class); + assertThat(ctx).hasSingleBean(TransactionalStepFactory.class); + assertThat(ctx).hasSingleBean(TransactionalStepAspect.class); + assertThat(ctx).hasSingleBean(TransactionalStepRegistrar.class); + }); + } + } + + @Test + void goldenPath() throws SQLException { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + var workflow = ctx.getBean(OrderWorkflowService.class); + var wfid = "wf-jooq-int-golden"; + + try (var _o = new WorkflowOptions(wfid).setContext()) { + assertThat(workflow.processOrder("ord-1", "Widget", 5)) + .isEqualTo(new Order("ord-1", "Widget", 5)); + } + + assertThat(orderCount(db.dataSource, "ord-1")).isEqualTo(1); + var rows = TransactionalStepTest.getTxRows(db.dataSource, wfid); + assertThat(rows).hasSize(1); + assertThat(rows.get(0).output()).isNotNull(); + assertThat(rows.get(0).error()).isNull(); + }); + } + } + + @Test + void idempotency() throws SQLException { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + var workflow = ctx.getBean(OrderWorkflowService.class); + var wfid = "wf-jooq-int-idem"; + + try (var _o = new WorkflowOptions(wfid).setContext()) { + workflow.processOrder("ord-2", "Gadget", 3); + } + try (var _o = new WorkflowOptions(wfid).setContext()) { + assertThat(workflow.processOrder("ord-2", "Gadget", 3)) + .isEqualTo(new Order("ord-2", "Gadget", 3)); + } + + assertThat(orderCount(db.dataSource, "ord-2")).isEqualTo(1); + assertThat(TransactionalStepTest.getTxRows(db.dataSource, wfid)).hasSize(1); + }); + } + } + + @Test + void atomicityOnFailure() throws SQLException { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + var workflow = ctx.getBean(OrderWorkflowService.class); + var wfid = "wf-jooq-int-fail"; + + try (var _o = new WorkflowOptions(wfid).setContext()) { + assertThatThrownBy(() -> workflow.triggerError("ord-3", "Thing", 1)) + .isInstanceOf(RuntimeException.class); + } + + assertThat(orderCount(db.dataSource, "ord-3")).isEqualTo(0); + var rows = TransactionalStepTest.getTxRows(db.dataSource, wfid); + assertThat(rows).hasSize(1); + assertThat(rows.get(0).output()).isNull(); + assertThat(rows.get(0).error()).isNotNull(); + }); + } + } + + @Test + void customSchema_property_tableCreatedInCustomSchema() throws SQLException { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .withPropertyValues("dbos.txstep.schema=custom_schema") + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + assertThat( + TransactionalStepTest.tableExists( + db.dataSource, "custom_schema", "tx_step_outputs")) + .isTrue(); + assertThat( + TransactionalStepTest.tableExists( + db.dataSource, SystemDatabase.sanitizeSchema(null), "tx_step_outputs")) + .isFalse(); + }); + } + } + + @Test + void outsideWorkflow_stepRunsAndCommits() throws SQLException { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + var steps = ctx.getBean(OrderStepService.class); + + assertThat(steps.placeOrder("ord-out-1", "Widget", 5)) + .isEqualTo(new Order("ord-out-1", "Widget", 5)); + + assertThat(orderCount(db.dataSource, "ord-out-1")).isEqualTo(1); + assertThat(TransactionalStepTest.totalTxRows(db.dataSource)).isEqualTo(0); + }); + } + } + + @Test + void outsideWorkflow_runtimeException_rollsBack() throws SQLException { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + var steps = ctx.getBean(OrderStepService.class); + + assertThatThrownBy(() -> steps.doError("ord-out-2", "Gadget", 1)) + .isInstanceOf(RuntimeException.class); + + assertThat(orderCount(db.dataSource, "ord-out-2")).isEqualTo(0); + assertThat(TransactionalStepTest.totalTxRows(db.dataSource)).isEqualTo(0); + }); + } + } + + @Test + void nestedTxStep_innerJoinsOuterTransaction() throws SQLException { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + var workflow = ctx.getBean(OrderWorkflowService.class); + var wfid = "wf-jooq-nested"; + + try (var _o = new WorkflowOptions(wfid).setContext()) { + assertThat( + workflow.processOrderWithNested("ord-outer", "ord-inner", "Sprocket", 2)) + .isEqualTo(new Order("ord-inner", "Sprocket", 2)); + } + + assertThat(orderCount(db.dataSource, "ord-outer")).isEqualTo(1); + assertThat(orderCount(db.dataSource, "ord-inner")).isEqualTo(1); + // Only the outer step writes to tx_step_outputs; inner runs in passthrough mode + var rows = TransactionalStepTest.getTxRows(db.dataSource, wfid); + assertThat(rows).hasSize(1); + assertThat(rows.get(0).output()).isNotNull(); + assertThat(rows.get(0).error()).isNull(); + }); + } + } + + @Test + void insideDbosStep_innerJoinsOuter() throws SQLException { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + var workflow = ctx.getBean(OrderWorkflowService.class); + var wfid = "wf-jooq-dbosstep"; + + try (var _o = new WorkflowOptions(wfid).setContext()) { + assertThat(workflow.processOrderViaDbosStep("ord-dbos", "Thing", 3)) + .isEqualTo(new Order("ord-dbos", "Thing", 3)); + } + + assertThat(orderCount(db.dataSource, "ord-dbos")).isEqualTo(1); + // Inner @TransactionalStep ran in passthrough mode — no tx_step_outputs entry + assertThat(TransactionalStepTest.getTxRows(db.dataSource, wfid)).isEmpty(); + }); + } + } +} diff --git a/transact-spring-txstep-starter/src/test/java/dev/dbos/transact/spring/txstep/TransactionalStepJpaIntegrationTest.java b/transact-spring-txstep-starter/src/test/java/dev/dbos/transact/spring/txstep/TransactionalStepJpaIntegrationTest.java new file mode 100644 index 00000000..ea30a28e --- /dev/null +++ b/transact-spring-txstep-starter/src/test/java/dev/dbos/transact/spring/txstep/TransactionalStepJpaIntegrationTest.java @@ -0,0 +1,438 @@ +package dev.dbos.transact.spring.txstep; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import dev.dbos.transact.DBOS; +import dev.dbos.transact.context.WorkflowOptions; +import dev.dbos.transact.database.SystemDatabase; +import dev.dbos.transact.spring.DBOSAutoConfiguration; +import dev.dbos.transact.workflow.Workflow; + +import java.sql.SQLException; +import java.util.Properties; + +import javax.sql.DataSource; + +import jakarta.persistence.Column; +import jakarta.persistence.Entity; +import jakarta.persistence.EntityManagerFactory; +import jakarta.persistence.Id; +import jakarta.persistence.Table; +import org.hibernate.jpa.HibernatePersistenceProvider; +import org.junit.jupiter.api.Test; +import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.orm.jpa.EntityManagerFactoryUtils; +import org.springframework.orm.jpa.JpaTransactionManager; +import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean; +import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter; + +public class TransactionalStepJpaIntegrationTest { + + // ---- JPA entity ---- + + @Entity + @Table(name = "greetings") + public static class Greeting { + @Id private String name; + + @Column(nullable = false) + private int count; + + protected Greeting() {} + + Greeting(String name, int count) { + this.name = name; + this.count = count; + } + + int count() { + return count; + } + } + + // ---- Step bean: @TransactionalStep methods using JPA EntityManager ---- + + public static class GreetingSteps { + private final EntityManagerFactory emf; + + GreetingSteps(EntityManagerFactory emf) { + this.emf = emf; + } + + @TransactionalStep + public String doInsert(String name) { + var em = EntityManagerFactoryUtils.getTransactionalEntityManager(emf); + var g = em.find(Greeting.class, name); + if (g == null) { + em.persist(new Greeting(name, 1)); + } else { + g.count++; + } + return name; + } + + @TransactionalStep + public String doError(String name) { + var em = EntityManagerFactoryUtils.getTransactionalEntityManager(emf); + var g = em.find(Greeting.class, name); + if (g == null) { + em.persist(new Greeting(name, 1)); + } else { + g.count++; + } + throw new RuntimeException("intentional failure"); + } + + // Always persists without checking — used to trigger a commit-time PK conflict + @TransactionalStep + public String doInsertForced(String name) { + var em = EntityManagerFactoryUtils.getTransactionalEntityManager(emf); + em.persist(new Greeting(name, 99)); + return name; + } + } + + // ---- Outer step: @TransactionalStep that calls an inner @TransactionalStep via proxy ---- + + public static class GreetingOuterSteps { + private final GreetingSteps inner; + private final EntityManagerFactory emf; + + GreetingOuterSteps(GreetingSteps inner, EntityManagerFactory emf) { + this.inner = inner; + this.emf = emf; + } + + @TransactionalStep + public String insertOuterAndNested(String outerName, String innerName) { + var em = EntityManagerFactoryUtils.getTransactionalEntityManager(emf); + em.persist(new Greeting(outerName, 1)); + return inner.doInsert(innerName); + } + } + + // ---- Workflow bean: @Workflow methods calling the step bean through its Spring proxy ---- + + public static class GreetingWorkflow { + private final DBOS dbos; + private final GreetingSteps steps; + private final GreetingOuterSteps outerSteps; + + GreetingWorkflow(DBOS dbos, GreetingSteps steps, GreetingOuterSteps outerSteps) { + this.dbos = dbos; + this.steps = steps; + this.outerSteps = outerSteps; + } + + @Workflow + public String insert(String name) { + return steps.doInsert(name); + } + + @Workflow + public String error(String name) { + return steps.doError(name); + } + + @Workflow + public String insertForced(String name) { + return steps.doInsertForced(name); + } + + @Workflow + public String insertOuterAndNested(String outerName, String innerName) { + return outerSteps.insertOuterAndNested(outerName, innerName); + } + + @Workflow + public String insertViaOuterDbosStep(String name) { + return dbos.runStep(() -> steps.doInsert(name), "insertViaOuterDbosStep"); + } + } + + // ---- Infrastructure: mirrors Spring Boot's JPA auto-configuration ---- + + @Configuration(proxyBeanMethods = false) + static class JpaInfraConfig { + @Bean + LocalContainerEntityManagerFactoryBean entityManagerFactory(DataSource dataSource) { + var emfBean = new LocalContainerEntityManagerFactoryBean(); + emfBean.setDataSource(dataSource); + emfBean.setPackagesToScan("dev.dbos.transact.spring.txstep"); + emfBean.setJpaVendorAdapter(new HibernateJpaVendorAdapter()); + emfBean.setPersistenceProviderClass(HibernatePersistenceProvider.class); + var props = new Properties(); + props.put("hibernate.hbm2ddl.auto", "update"); + props.put("hibernate.dialect", "org.hibernate.dialect.PostgreSQLDialect"); + emfBean.setJpaProperties(props); + return emfBean; + } + + @Bean + JpaTransactionManager transactionManager(EntityManagerFactory emf) { + return new JpaTransactionManager(emf); + } + } + + // ---- Spring configuration registering the application beans ---- + + @Configuration(proxyBeanMethods = false) + static class GreetingConfig { + @Bean + GreetingSteps greetingSteps(EntityManagerFactory emf) { + return new GreetingSteps(emf); + } + + @Bean + GreetingOuterSteps greetingOuterSteps(GreetingSteps steps, EntityManagerFactory emf) { + return new GreetingOuterSteps(steps, emf); + } + + @Bean + GreetingWorkflow greetingWorkflow( + DBOS dbos, GreetingSteps steps, GreetingOuterSteps outerSteps) { + return new GreetingWorkflow(dbos, steps, outerSteps); + } + } + + // ---- Runner ---- + + private static ApplicationContextRunner runner(TransactionalStepTest.TestDatabase db) { + return new ApplicationContextRunner() + .withConfiguration( + AutoConfigurations.of( + DBOSAutoConfiguration.class, TransactionalStepAutoConfiguration.class)) + .withPropertyValues("dbos.application.name=txstep-jpa-test") + .withBean("dataSource", DataSource.class, () -> db.dataSource) + .withUserConfiguration(JpaInfraConfig.class, GreetingConfig.class); + } + + // ---- Tests ---- + + @Test + void autoConfig_createsExpectedBeans() { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + assertThat(ctx).hasSingleBean(DBOS.class); + assertThat(ctx).hasSingleBean(TransactionalStepFactory.class); + assertThat(ctx).hasSingleBean(TransactionalStepAspect.class); + assertThat(ctx).hasSingleBean(TransactionalStepRegistrar.class); + }); + } + } + + @Test + void goldenPath() throws SQLException { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + var workflow = ctx.getBean(GreetingWorkflow.class); + var wfid = "wf-jpa-int-golden"; + + try (var _o = new WorkflowOptions(wfid).setContext()) { + assertThat(workflow.insert("alice")).isEqualTo("alice"); + } + + assertThat(TransactionalStepTest.greetCount(db.dataSource, "alice")).isEqualTo(1); + var rows = TransactionalStepTest.getTxRows(db.dataSource, wfid); + assertThat(rows).hasSize(1); + assertThat(rows.get(0).output()).isNotNull(); + assertThat(rows.get(0).error()).isNull(); + }); + } + } + + @Test + void idempotency() throws SQLException { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + var workflow = ctx.getBean(GreetingWorkflow.class); + var wfid = "wf-jpa-int-idem"; + + try (var _o = new WorkflowOptions(wfid).setContext()) { + workflow.insert("bob"); + } + try (var _o = new WorkflowOptions(wfid).setContext()) { + var result = workflow.insert("bob"); + assertThat(result).isEqualTo("bob"); + } + + assertThat(TransactionalStepTest.greetCount(db.dataSource, "bob")).isEqualTo(1); + assertThat(TransactionalStepTest.getTxRows(db.dataSource, wfid)).hasSize(1); + }); + } + } + + @Test + void atomicityOnFailure() throws SQLException { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + var workflow = ctx.getBean(GreetingWorkflow.class); + var wfid = "wf-jpa-int-fail"; + + try (var _o = new WorkflowOptions(wfid).setContext()) { + assertThatThrownBy(() -> workflow.error("charlie")) + .isInstanceOf(RuntimeException.class); + } + + assertThat(TransactionalStepTest.greetCount(db.dataSource, "charlie")).isEqualTo(0); + var rows = TransactionalStepTest.getTxRows(db.dataSource, wfid); + assertThat(rows).hasSize(1); + assertThat(rows.get(0).output()).isNull(); + assertThat(rows.get(0).error()).isNotNull(); + }); + } + } + + @Test + void customSchema_property_tableCreatedInCustomSchema() throws SQLException { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .withPropertyValues("dbos.txstep.schema=custom_schema") + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + assertThat( + TransactionalStepTest.tableExists( + db.dataSource, "custom_schema", "tx_step_outputs")) + .isTrue(); + assertThat( + TransactionalStepTest.tableExists( + db.dataSource, SystemDatabase.sanitizeSchema(null), "tx_step_outputs")) + .isFalse(); + }); + } + } + + // JPA flushes on commit, so a PK conflict surfaces during txManager.commit(), not + // supplier.execute(). + // The transaction is already rolled back internally by Spring when commit() throws — the + // isCompleted() guard in the catch block prevents a second rollback attempt. + @Test + void commitTimeException_rollsBackAndRecordsError() throws SQLException { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + var workflow = ctx.getBean(GreetingWorkflow.class); + var wfid = "wf-jpa-commit-err"; + + try (var conn = db.dataSource.getConnection(); + var stmt = + conn.prepareStatement("INSERT INTO greetings(name, count) VALUES (?, ?)")) { + stmt.setString(1, "dave"); + stmt.setInt(2, 1); + stmt.executeUpdate(); + } + + try (var _o = new WorkflowOptions(wfid).setContext()) { + assertThatThrownBy(() -> workflow.insertForced("dave")) + .isInstanceOf(Exception.class); + } + + assertThat(TransactionalStepTest.greetCount(db.dataSource, "dave")).isEqualTo(1); + var rows = TransactionalStepTest.getTxRows(db.dataSource, wfid); + assertThat(rows).hasSize(1); + assertThat(rows.get(0).output()).isNull(); + assertThat(rows.get(0).error()).isNotNull(); + }); + } + } + + @Test + void outsideWorkflow_stepRunsAndCommits() throws SQLException { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + var steps = ctx.getBean(GreetingSteps.class); + + assertThat(steps.doInsert("eve")).isEqualTo("eve"); + + assertThat(TransactionalStepTest.greetCount(db.dataSource, "eve")).isEqualTo(1); + assertThat(TransactionalStepTest.totalTxRows(db.dataSource)).isEqualTo(0); + }); + } + } + + @Test + void outsideWorkflow_runtimeException_rollsBack() throws SQLException { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + var steps = ctx.getBean(GreetingSteps.class); + + assertThatThrownBy(() -> steps.doError("frank")) + .isInstanceOf(RuntimeException.class); + + assertThat(TransactionalStepTest.greetCount(db.dataSource, "frank")).isEqualTo(0); + assertThat(TransactionalStepTest.totalTxRows(db.dataSource)).isEqualTo(0); + }); + } + } + + @Test + void nestedTxStep_innerJoinsOuterTransaction() throws SQLException { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + var workflow = ctx.getBean(GreetingWorkflow.class); + var wfid = "wf-jpa-nested"; + + try (var _o = new WorkflowOptions(wfid).setContext()) { + assertThat(workflow.insertOuterAndNested("grace", "heidi")).isEqualTo("heidi"); + } + + assertThat(TransactionalStepTest.greetCount(db.dataSource, "grace")).isEqualTo(1); + assertThat(TransactionalStepTest.greetCount(db.dataSource, "heidi")).isEqualTo(1); + // Only the outer step writes to tx_step_outputs; inner runs in passthrough mode + var rows = TransactionalStepTest.getTxRows(db.dataSource, wfid); + assertThat(rows).hasSize(1); + assertThat(rows.get(0).output()).isNotNull(); + assertThat(rows.get(0).error()).isNull(); + }); + } + } + + @Test + void insideDbosStep_innerJoinsOuter() throws SQLException { + try (var db = new TransactionalStepTest.TestDatabase()) { + runner(db) + .run( + ctx -> { + assertThat(ctx).hasNotFailed(); + var workflow = ctx.getBean(GreetingWorkflow.class); + var wfid = "wf-jpa-dbosstep"; + + try (var _o = new WorkflowOptions(wfid).setContext()) { + assertThat(workflow.insertViaOuterDbosStep("ivan")).isEqualTo("ivan"); + } + + assertThat(TransactionalStepTest.greetCount(db.dataSource, "ivan")).isEqualTo(1); + // Inner @TransactionalStep ran in passthrough mode — no tx_step_outputs entry + assertThat(TransactionalStepTest.getTxRows(db.dataSource, wfid)).isEmpty(); + }); + } + } +} diff --git a/transact-spring-txstep-starter/src/test/java/dev/dbos/transact/spring/txstep/TransactionalStepTest.java b/transact-spring-txstep-starter/src/test/java/dev/dbos/transact/spring/txstep/TransactionalStepTest.java new file mode 100644 index 00000000..4058f8af --- /dev/null +++ b/transact-spring-txstep-starter/src/test/java/dev/dbos/transact/spring/txstep/TransactionalStepTest.java @@ -0,0 +1,869 @@ +package dev.dbos.transact.spring.txstep; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import dev.dbos.transact.DBOS; +import dev.dbos.transact.config.DBOSConfig; +import dev.dbos.transact.context.WorkflowOptions; +import dev.dbos.transact.database.SystemDatabase; +import dev.dbos.transact.json.SerializationUtil; +import dev.dbos.transact.workflow.Workflow; + +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Semaphore; + +import javax.sql.DataSource; + +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import org.hibernate.jpa.HibernatePersistenceProvider; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.AutoClose; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.orm.jpa.JpaTransactionManager; +import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean; +import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter; +import org.testcontainers.postgresql.PostgreSQLContainer; + +public class TransactionalStepTest { + + // ---- Shared Postgres container pool ---- + + private static final int POOL_SIZE = Runtime.getRuntime().availableProcessors(); + private static final BlockingQueue PG_POOL = + new ArrayBlockingQueue<>(POOL_SIZE); + private static final Semaphore PG_PERMITS = new Semaphore(POOL_SIZE); + + static { + Runtime.getRuntime() + .addShutdownHook( + new Thread( + () -> { + var containers = new ArrayList(); + PG_POOL.drainTo(containers); + containers.forEach(PostgreSQLContainer::stop); + })); + } + + private static PostgreSQLContainer acquireContainer() { + try { + PG_PERMITS.acquire(); + var c = PG_POOL.poll(); + if (c == null) { + c = new PostgreSQLContainer("postgres:18"); + c.start(); + } + return c; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + private static void releaseContainer(PostgreSQLContainer c) { + if (!PG_POOL.offer(c)) { + c.stop(); + } + PG_PERMITS.release(); + } + + // ---- Test helper: one isolated DB per test ---- + + static class TestDatabase implements AutoCloseable { + final PostgreSQLContainer container; + final String dbName; + final String jdbcUrl; + final HikariDataSource dataSource; + + TestDatabase() { + container = acquireContainer(); + dbName = "test_" + UUID.randomUUID().toString().replace("-", ""); + jdbcUrl = container.getJdbcUrl().replaceFirst("/[^/]+$", "/" + dbName); + try (var conn = + DriverManager.getConnection( + container.getJdbcUrl(), container.getUsername(), container.getPassword()); + var stmt = conn.createStatement()) { + stmt.execute("CREATE DATABASE " + dbName); + } catch (SQLException e) { + throw new RuntimeException(e); + } + var cfg = new HikariConfig(); + cfg.setJdbcUrl(jdbcUrl); + cfg.setUsername(container.getUsername()); + cfg.setPassword(container.getPassword()); + dataSource = new HikariDataSource(cfg); + } + + DBOSConfig dbosConfig() { + return DBOSConfig.defaults("txstep-test") + .withDatabaseUrl(jdbcUrl) + .withDbUser(container.getUsername()) + .withDbPassword(container.getPassword()); + } + + @Override + public void close() { + dataSource.close(); + try (var conn = + DriverManager.getConnection( + container.getJdbcUrl(), container.getUsername(), container.getPassword()); + var stmt = conn.createStatement()) { + stmt.execute("DROP DATABASE IF EXISTS " + dbName + " WITH (FORCE)"); + } catch (SQLException e) { + throw new RuntimeException(e); + } + releaseContainer(container); + } + } + + // ---- DB query helpers ---- + + record TxRow(String workflowId, int stepId, String output, String error) {} + + static List getTxRows(DataSource ds, String workflowId) throws SQLException { + var schema = SystemDatabase.sanitizeSchema(null); + var sql = + "SELECT * FROM \"%s\".tx_step_outputs WHERE workflow_id = ? ORDER BY step_id" + .formatted(schema); + try (var conn = ds.getConnection(); + var stmt = conn.prepareStatement(sql)) { + stmt.setString(1, workflowId); + try (ResultSet rs = stmt.executeQuery()) { + var rows = new ArrayList(); + while (rs.next()) { + rows.add( + new TxRow( + rs.getString("workflow_id"), + rs.getInt("step_id"), + rs.getString("output"), + rs.getString("error"))); + } + return rows; + } + } + } + + static boolean tableExists(DataSource ds, String schema, String table) throws SQLException { + try (var conn = ds.getConnection(); + var rs = conn.getMetaData().getTables(null, schema, table, new String[] {"TABLE"})) { + return rs.next(); + } + } + + static int totalTxRows(DataSource ds) throws SQLException { + var schema = SystemDatabase.sanitizeSchema(null); + var sql = "SELECT COUNT(*) FROM \"%s\".tx_step_outputs".formatted(schema); + try (var conn = ds.getConnection(); + var stmt = conn.prepareStatement(sql); + var rs = stmt.executeQuery()) { + return rs.next() ? rs.getInt(1) : 0; + } + } + + static int greetCount(DataSource ds, String name) throws SQLException { + try (var conn = ds.getConnection(); + var stmt = conn.prepareStatement("SELECT count FROM greetings WHERE name = ?")) { + stmt.setString(1, name); + try (var rs = stmt.executeQuery()) { + return rs.next() ? rs.getInt("count") : 0; + } + } + } + + // ---- Test service shared by all variants ---- + + interface GreetingService { + String insert(String name); + + String error(String name); + + void voidStep(); + + String conflictInsert(String name) throws SQLException; + + String insertWithNestedStep(String outer, String inner); + + String insertNestedInnerRuntimeError(String outer, String inner); + + String insertNestedInnerCheckedException(String outer, String inner) throws Exception; + + String insertViaDbosStep(String name); + + String insertViaDbosStepWithRuntimeError(String name); + + String insertViaDbosStepWithCheckedException(String name) throws Exception; + } + + static class GreetingServiceImpl implements GreetingService { + private final DBOS dbos; + private final TransactionalStepFactory factory; + private final JdbcTemplate jdbc; + private final String schema; + + GreetingServiceImpl( + DBOS dbos, TransactionalStepFactory factory, JdbcTemplate jdbc, String schema) { + this.dbos = dbos; + this.factory = factory; + this.jdbc = jdbc; + this.schema = schema; + } + + @Override + @Workflow + public String insert(String name) { + return (String) + factory.runTransactionalStep( + () -> { + jdbc.update( + "INSERT INTO greetings(name, count) VALUES (?, 1)" + + " ON CONFLICT(name) DO UPDATE SET count = greetings.count + 1", + name); + return name; + }, + "insert"); + } + + @Override + @Workflow + public String error(String name) { + return (String) + factory.runTransactionalStep( + () -> { + jdbc.update( + "INSERT INTO greetings(name, count) VALUES (?, 1)" + + " ON CONFLICT(name) DO UPDATE SET count = greetings.count + 1", + name); + throw new RuntimeException("intentional failure"); + }, + "error"); + } + + @Override + @Workflow + public void voidStep() { + factory.runTransactionalStep(() -> null, "voidStep"); + } + + @Override + @Workflow + public String insertWithNestedStep(String outer, String inner) { + return (String) + factory.runTransactionalStep( + () -> { + jdbc.update( + "INSERT INTO greetings(name, count) VALUES (?, 1)" + + " ON CONFLICT(name) DO UPDATE SET count = greetings.count + 1", + outer); + // Inner step called from inside the outer step's supplier body. DBOS.inStep() is + // true here (set by DBOSExecutor.executeStep before invoking the outer lambda), + // so this takes the PROPAGATION_REQUIRED path and joins the outer transaction. + factory.runTransactionalStep( + () -> { + jdbc.update( + "INSERT INTO greetings(name, count) VALUES (?, 1)" + + " ON CONFLICT(name) DO UPDATE SET count = greetings.count + 1", + inner); + return null; + }, + "inner"); + return outer; + }, + "outer"); + } + + @Override + @Workflow + public String insertNestedInnerRuntimeError(String outer, String inner) { + return (String) + factory.runTransactionalStep( + () -> { + jdbc.update( + "INSERT INTO greetings(name, count) VALUES (?, 1)" + + " ON CONFLICT(name) DO UPDATE SET count = greetings.count + 1", + outer); + factory.runTransactionalStep( + () -> { + jdbc.update( + "INSERT INTO greetings(name, count) VALUES (?, 1)" + + " ON CONFLICT(name) DO UPDATE SET count = greetings.count + 1", + inner); + throw new RuntimeException("inner failure"); + }, + "inner"); + return outer; + }, + "outer"); + } + + @Override + @Workflow + public String insertNestedInnerCheckedException(String outer, String inner) throws Exception { + return (String) + factory.runTransactionalStep( + () -> { + jdbc.update( + "INSERT INTO greetings(name, count) VALUES (?, 1)" + + " ON CONFLICT(name) DO UPDATE SET count = greetings.count + 1", + outer); + factory.runTransactionalStep( + () -> { + jdbc.update( + "INSERT INTO greetings(name, count) VALUES (?, 1)" + + " ON CONFLICT(name) DO UPDATE SET count = greetings.count + 1", + inner); + throw new Exception("inner checked"); + }, + "inner"); + return outer; + }, + "outer"); + } + + @Override + @Workflow + public String insertViaDbosStep(String name) { + return dbos.runStep( + () -> + (String) + factory.runTransactionalStep( + () -> { + jdbc.update( + "INSERT INTO greetings(name, count) VALUES (?, 1)" + + " ON CONFLICT(name) DO UPDATE SET count = greetings.count + 1", + name); + return name; + }, + "insert"), + "dbosStep"); + } + + @Override + @Workflow + public String insertViaDbosStepWithRuntimeError(String name) { + return dbos.runStep( + () -> + (String) + factory.runTransactionalStep( + () -> { + jdbc.update( + "INSERT INTO greetings(name, count) VALUES (?, 1)" + + " ON CONFLICT(name) DO UPDATE SET count = greetings.count + 1", + name); + throw new RuntimeException("inner failure"); + }, + "insert"), + "dbosStep"); + } + + @Override + @Workflow + public String insertViaDbosStepWithCheckedException(String name) throws Exception { + return dbos.runStep( + () -> + (String) + factory.runTransactionalStep( + () -> { + jdbc.update( + "INSERT INTO greetings(name, count) VALUES (?, 1)" + + " ON CONFLICT(name) DO UPDATE SET count = greetings.count + 1", + name); + throw new Exception("inner checked"); + }, + "insert"), + "dbosStep"); + } + + // Simulates a concurrent winner committing a result while this executor's transaction is still + // open. The separate autocommit connection represents the other executor — its INSERT persists + // even when the Spring transaction manager rolls back the main transaction. When recordOutput + // subsequently tries to INSERT the same (workflowId, stepId) key, it gets a 23505 + // unique-constraint violation. The factory rolls back and falls back to checkExecution. + @Override + @Workflow + public String conflictInsert(String name) throws SQLException { + return (String) + factory.runTransactionalStep( + () -> { + var wfId = Objects.requireNonNull(DBOS.workflowId()); + var stepId = Objects.requireNonNull(DBOS.stepId()); + var value = SerializationUtil.serializeValue("winner", null, null); + var sql = + """ + INSERT INTO "%s".tx_step_outputs(workflow_id, step_id, output, error, serialization) + VALUES (?, ?, ?, NULL, ?) + """ + .formatted(schema); + try (var conn2 = jdbc.getDataSource().getConnection(); + var stmt = conn2.prepareStatement(sql)) { + stmt.setString(1, wfId); + stmt.setInt(2, stepId); + stmt.setString(3, value.serializedValue()); + stmt.setString(4, value.serialization()); + stmt.executeUpdate(); + } + jdbc.update("INSERT INTO greetings(name, count) VALUES (?, 1)", name); + return name; + }, + "conflictInsert"); + } + } + + // ---- DataSourceTransactionManager tests ---- + + @Nested + class WithDataSourceTransactionManager { + + @AutoClose TestDatabase db; + @AutoClose DBOS dbos; + JdbcTemplate jdbc; + TransactionalStepFactory factory; + GreetingService proxy; + + @BeforeEach + void setup() throws SQLException { + db = new TestDatabase(); + jdbc = new JdbcTemplate(db.dataSource); + + try (var conn = db.dataSource.getConnection(); + var stmt = conn.createStatement()) { + stmt.execute( + "CREATE TABLE greetings (name TEXT PRIMARY KEY, count INT NOT NULL DEFAULT 0)"); + } + + dbos = new DBOS(db.dbosConfig()); + var txManager = new DataSourceTransactionManager(db.dataSource); + factory = new TransactionalStepFactory(dbos, db.dataSource, txManager, null); + factory.initialize(); + + var impl = new GreetingServiceImpl(dbos, factory, jdbc, SystemDatabase.sanitizeSchema(null)); + proxy = dbos.registerProxy(GreetingService.class, impl); + dbos.launch(); + } + + @AfterEach + void teardown() { + if (dbos != null) dbos.close(); + } + + @Test + void goldenPath() throws SQLException { + var wfid = "wf-golden"; + try (var _o = new WorkflowOptions(wfid).setContext()) { + var result = proxy.insert("alice"); + assertThat(result).isEqualTo("alice"); + } + + assertThat(greetCount(db.dataSource, "alice")).isEqualTo(1); + var rows = getTxRows(db.dataSource, wfid); + assertThat(rows).hasSize(1); + assertThat(rows.get(0).output()).isNotNull(); + assertThat(rows.get(0).error()).isNull(); + } + + @Test + void idempotency() throws SQLException { + var wfid = "wf-idem"; + + try (var _o = new WorkflowOptions(wfid).setContext()) { + proxy.insert("bob"); + } + // second call with same workflow ID — must not re-execute + try (var _o = new WorkflowOptions(wfid).setContext()) { + var result = proxy.insert("bob"); + assertThat(result).isEqualTo("bob"); + } + + assertThat(greetCount(db.dataSource, "bob")).isEqualTo(1); + assertThat(getTxRows(db.dataSource, wfid)).hasSize(1); + } + + @Test + void atomicityOnFailure() throws SQLException { + var wfid = "wf-fail"; + try (var _o = new WorkflowOptions(wfid).setContext()) { + assertThatThrownBy(() -> proxy.error("charlie")).isInstanceOf(RuntimeException.class); + } + + // main transaction rolled back — no greeting inserted + assertThat(greetCount(db.dataSource, "charlie")).isEqualTo(0); + var rows = getTxRows(db.dataSource, wfid); + assertThat(rows).hasSize(1); + assertThat(rows.get(0).output()).isNull(); + assertThat(rows.get(0).error()).isNotNull(); + } + + @Test + void voidMethods() throws SQLException { + var wfid = "wf-void"; + try (var _o = new WorkflowOptions(wfid).setContext()) { + proxy.voidStep(); + } + + var rows = getTxRows(db.dataSource, wfid); + assertThat(rows).hasSize(1); + assertThat(rows.get(0).error()).isNull(); + } + + @Test + void outsideWorkflow_stepRunsAndCommits() throws SQLException { + // No WorkflowOptions — !DBOS.inWorkflow() is true, so PROPAGATION_REQUIRED is used. + // No idempotency tracking; the step just runs in a fresh transaction and commits. + var result = + factory.runTransactionalStep( + () -> { + jdbc.update("INSERT INTO greetings(name, count) VALUES (?, 1)", "outside"); + return "outside"; + }, + "insert"); + + assertThat(result).isEqualTo("outside"); + assertThat(greetCount(db.dataSource, "outside")).isEqualTo(1); + assertThat(totalTxRows(db.dataSource)).isEqualTo(0); + } + + @Test + void outsideWorkflow_runtimeException_rollsBack() throws SQLException { + assertThatThrownBy( + () -> + factory.runTransactionalStep( + () -> { + jdbc.update( + "INSERT INTO greetings(name, count) VALUES (?, 1)", "outside-fail"); + throw new RuntimeException("intentional"); + }, + "insert")) + .isInstanceOf(RuntimeException.class); + + assertThat(greetCount(db.dataSource, "outside-fail")).isEqualTo(0); + assertThat(totalTxRows(db.dataSource)).isEqualTo(0); + } + + @Test + void outsideWorkflow_checkedException_commits() throws SQLException { + // Matches @Transactional default: checked exceptions do not trigger rollback. + assertThatThrownBy( + () -> + factory.runTransactionalStep( + () -> { + jdbc.update( + "INSERT INTO greetings(name, count) VALUES (?, 1)", "outside-checked"); + throw new Exception("checked"); + }, + "insert")) + .isInstanceOf(Exception.class) + .hasMessage("checked"); + + assertThat(greetCount(db.dataSource, "outside-checked")).isEqualTo(1); + assertThat(totalTxRows(db.dataSource)).isEqualTo(0); + } + + @Test + void nestedStep_innerJoinsOuterTransaction() throws SQLException { + var wfid = "wf-nested"; + try (var _o = new WorkflowOptions(wfid).setContext()) { + assertThat(proxy.insertWithNestedStep("outer-name", "inner-name")).isEqualTo("outer-name"); + } + + // Both inserts committed as part of the outer step's REQUIRES_NEW transaction. + assertThat(greetCount(db.dataSource, "outer-name")).isEqualTo(1); + assertThat(greetCount(db.dataSource, "inner-name")).isEqualTo(1); + // Only the outer step records to tx_step_outputs; the inner step took the passthrough path. + var rows = getTxRows(db.dataSource, wfid); + assertThat(rows).hasSize(1); + assertThat(rows.get(0).output()).isNotNull(); + } + + @Test + void nestedStep_innerRuntimeException_rollsBackBothWrites() throws SQLException { + var wfid = "wf-nested-rt-fail"; + try (var _o = new WorkflowOptions(wfid).setContext()) { + assertThatThrownBy(() -> proxy.insertNestedInnerRuntimeError("outer-rt", "inner-rt")) + .isInstanceOf(RuntimeException.class) + .hasMessage("inner failure"); + } + + // Inner step's rollback marks the outer REQUIRES_NEW tx rollback-only; both writes lost. + assertThat(greetCount(db.dataSource, "outer-rt")).isEqualTo(0); + assertThat(greetCount(db.dataSource, "inner-rt")).isEqualTo(0); + var rows = getTxRows(db.dataSource, wfid); + assertThat(rows).hasSize(1); + assertThat(rows.get(0).error()).isNotNull(); + assertThat(rows.get(0).output()).isNull(); + } + + @Test + void nestedStep_innerCheckedException_rollsBackBothWrites() throws SQLException { + var wfid = "wf-nested-checked-fail"; + try (var _o = new WorkflowOptions(wfid).setContext()) { + assertThatThrownBy( + () -> proxy.insertNestedInnerCheckedException("outer-checked", "inner-checked")) + .isInstanceOf(Exception.class) + .hasMessage("inner checked"); + } + + // Inner step commits its participation (no-op), but the outer step catches the propagated + // exception, rolls back the REQUIRES_NEW tx, and discards both writes. + assertThat(greetCount(db.dataSource, "outer-checked")).isEqualTo(0); + assertThat(greetCount(db.dataSource, "inner-checked")).isEqualTo(0); + var rows = getTxRows(db.dataSource, wfid); + assertThat(rows).hasSize(1); + assertThat(rows.get(0).error()).isNotNull(); + assertThat(rows.get(0).output()).isNull(); + } + + @Test + void dbosStep_transactionalStepRunsAndCommits() throws SQLException { + // @TransactionalStep called from inside a dbos.runStep() lambda: DBOS.inStep() is true, + // so the passthrough path runs with PROPAGATION_REQUIRED. No tx_step_outputs row written. + var wfid = "wf-dbos-step-golden"; + try (var _o = new WorkflowOptions(wfid).setContext()) { + assertThat(proxy.insertViaDbosStep("dbos-step")).isEqualTo("dbos-step"); + } + + assertThat(greetCount(db.dataSource, "dbos-step")).isEqualTo(1); + assertThat(totalTxRows(db.dataSource)).isEqualTo(0); + } + + @Test + void dbosStep_runtimeException_rollsBack() throws SQLException { + var wfid = "wf-dbos-step-rt-fail"; + try (var _o = new WorkflowOptions(wfid).setContext()) { + assertThatThrownBy(() -> proxy.insertViaDbosStepWithRuntimeError("dbos-step-fail")) + .isInstanceOf(RuntimeException.class) + .hasMessage("inner failure"); + } + + assertThat(greetCount(db.dataSource, "dbos-step-fail")).isEqualTo(0); + assertThat(totalTxRows(db.dataSource)).isEqualTo(0); + } + + @Test + void dbosStep_checkedException_commits() throws SQLException { + // Checked exception: PROPAGATION_REQUIRED passthrough commits the write, rethrows. + var wfid = "wf-dbos-step-checked-fail"; + try (var _o = new WorkflowOptions(wfid).setContext()) { + assertThatThrownBy(() -> proxy.insertViaDbosStepWithCheckedException("dbos-step-checked")) + .isInstanceOf(Exception.class) + .hasMessage("inner checked"); + } + + assertThat(greetCount(db.dataSource, "dbos-step-checked")).isEqualTo(1); + assertThat(totalTxRows(db.dataSource)).isEqualTo(0); + } + + // Two executors race to write the result for the same step. The loser detects the 23505 + // conflict on its INSERT, rolls back its transaction, and returns the winner's stored value. + @Test + void upsertConflict() throws SQLException { + var wfid = "wf-conflict"; + + try (var _o = new WorkflowOptions(wfid).setContext()) { + var result = proxy.conflictInsert("diana"); + // Returns winner's sentinel value, not what the supplier would have returned + assertThat(result).isEqualTo("winner"); + } + + // Main transaction was rolled back — INSERT into greetings never committed + assertThat(greetCount(db.dataSource, "diana")).isEqualTo(0); + + // Exactly one tx_step_outputs row containing the winner's result + var rows = getTxRows(db.dataSource, wfid); + assertThat(rows).hasSize(1); + assertThat(rows.get(0).output()).isNotNull(); + assertThat(rows.get(0).error()).isNull(); + } + } + + // ---- Custom schema tests ---- + + @Test + void customSchema_explicitSchemaOverride_tableInCustomSchema() throws SQLException { + try (var db = new TestDatabase()) { + try (var dbos = new DBOS(db.dbosConfig())) { + var txManager = new DataSourceTransactionManager(db.dataSource); + var factory = new TransactionalStepFactory(dbos, db.dataSource, txManager, "app_schema"); + factory.initialize(); + dbos.launch(); + + assertThat(tableExists(db.dataSource, "app_schema", "tx_step_outputs")).isTrue(); + assertThat( + tableExists(db.dataSource, SystemDatabase.sanitizeSchema(null), "tx_step_outputs")) + .isFalse(); + } + } + } + + @Test + void customSchema_nullExplicit_fallsBackToDbosConfigSchema() throws SQLException { + try (var db = new TestDatabase()) { + try (var dbos = new DBOS(db.dbosConfig().withDatabaseSchema("cfg_schema"))) { + var txManager = new DataSourceTransactionManager(db.dataSource); + var factory = new TransactionalStepFactory(dbos, db.dataSource, txManager, null); + factory.initialize(); + dbos.launch(); + + assertThat(tableExists(db.dataSource, "cfg_schema", "tx_step_outputs")).isTrue(); + assertThat( + tableExists(db.dataSource, SystemDatabase.sanitizeSchema(null), "tx_step_outputs")) + .isFalse(); + } + } + } + + @Test + void customSchema_bothNull_usesDefaultDbosSchema() throws SQLException { + try (var db = new TestDatabase()) { + try (var dbos = new DBOS(db.dbosConfig())) { + var txManager = new DataSourceTransactionManager(db.dataSource); + var factory = new TransactionalStepFactory(dbos, db.dataSource, txManager, null); + factory.initialize(); + dbos.launch(); + + var defaultSchema = SystemDatabase.sanitizeSchema(null); + assertThat(tableExists(db.dataSource, defaultSchema, "tx_step_outputs")).isTrue(); + } + } + } + + // ---- Lazy initialization test ---- + + @Test + void lazyInitialization_noTransactionalStepMethods_tableNotCreated() throws SQLException { + try (var db = new TestDatabase()) { + var dbosConfig = db.dbosConfig(); + try (var dbos = new DBOS(dbosConfig)) { + var txManager = new DataSourceTransactionManager(db.dataSource); + new TransactionalStepFactory(dbos, db.dataSource, txManager, null); + // initialize() is NOT called — simulating no @TransactionalStep methods found + dbos.launch(); + + var schema = SystemDatabase.sanitizeSchema(null); + assertThat(tableExists(db.dataSource, schema, "tx_step_outputs")).isFalse(); + } + } + } + + // ---- JPA path tests ---- + + @Nested + class WithJpaTransactionManager { + + @AutoClose TestDatabase db; + @AutoClose DBOS dbos; + JdbcTemplate jdbc; + TransactionalStepFactory factory; + GreetingService proxy; + + @BeforeEach + void setup() throws Exception { + db = new TestDatabase(); + jdbc = new JdbcTemplate(db.dataSource); + + try (var conn = db.dataSource.getConnection(); + var stmt = conn.createStatement()) { + stmt.execute( + "CREATE TABLE greetings (name TEXT PRIMARY KEY, count INT NOT NULL DEFAULT 0)"); + } + + dbos = new DBOS(db.dbosConfig()); + + var jpaTxManager = buildJpaTransactionManager(db.dataSource); + factory = new TransactionalStepFactory(dbos, db.dataSource, jpaTxManager, null); + factory.initialize(); + + var impl = new GreetingServiceImpl(dbos, factory, jdbc, SystemDatabase.sanitizeSchema(null)); + proxy = dbos.registerProxy(GreetingService.class, impl); + dbos.launch(); + } + + @AfterEach + void teardown() { + if (dbos != null) dbos.close(); + } + + @Test + void goldenPath() throws SQLException { + var wfid = "wf-jpa-golden"; + try (var _o = new WorkflowOptions(wfid).setContext()) { + var result = proxy.insert("dave"); + assertThat(result).isEqualTo("dave"); + } + + assertThat(greetCount(db.dataSource, "dave")).isEqualTo(1); + var rows = getTxRows(db.dataSource, wfid); + assertThat(rows).hasSize(1); + assertThat(rows.get(0).output()).isNotNull(); + assertThat(rows.get(0).error()).isNull(); + } + + @Test + void atomicityOnFailure() throws SQLException { + var wfid = "wf-jpa-fail"; + try (var _o = new WorkflowOptions(wfid).setContext()) { + assertThatThrownBy(() -> proxy.error("eve")).isInstanceOf(RuntimeException.class); + } + + assertThat(greetCount(db.dataSource, "eve")).isEqualTo(0); + var rows = getTxRows(db.dataSource, wfid); + assertThat(rows).hasSize(1); + assertThat(rows.get(0).output()).isNull(); + assertThat(rows.get(0).error()).isNotNull(); + } + + // Two executors race to write the result for the same step. The loser detects the 23505 + // conflict on its INSERT, rolls back its transaction, and returns the winner's stored value. + @Test + void upsertConflict() throws SQLException { + var wfid = "wf-jpa-conflict"; + + try (var _o = new WorkflowOptions(wfid).setContext()) { + var result = proxy.conflictInsert("fiona"); + // Returns winner's sentinel value, not what the supplier would have returned + assertThat(result).isEqualTo("winner"); + } + + // Main transaction was rolled back — INSERT into greetings never committed + assertThat(greetCount(db.dataSource, "fiona")).isEqualTo(0); + + // Exactly one tx_step_outputs row containing the winner's result + var rows = getTxRows(db.dataSource, wfid); + assertThat(rows).hasSize(1); + assertThat(rows.get(0).output()).isNotNull(); + assertThat(rows.get(0).error()).isNull(); + } + + private static JpaTransactionManager buildJpaTransactionManager(DataSource dataSource) { + var emfBean = new LocalContainerEntityManagerFactoryBean(); + emfBean.setDataSource(dataSource); + emfBean.setPackagesToScan(); // no entity classes + emfBean.setJpaVendorAdapter(new HibernateJpaVendorAdapter()); + emfBean.setPersistenceProviderClass(HibernatePersistenceProvider.class); + var props = new Properties(); + props.put("hibernate.hbm2ddl.auto", "none"); + props.put("hibernate.dialect", "org.hibernate.dialect.PostgreSQLDialect"); + emfBean.setJpaProperties(props); + emfBean.afterPropertiesSet(); + + var jpaTxManager = new JpaTransactionManager(emfBean.getObject()); + // Simulating what TransactionalStepAutoConfiguration.JpaBridgeConfiguration does: + // set the dataSource so DataSourceUtils.getConnection() returns the tx-bound connection. + jpaTxManager.setDataSource(dataSource); + jpaTxManager.afterPropertiesSet(); + return jpaTxManager; + } + } +} diff --git a/transact/src/main/java/dev/dbos/transact/txstep/JdbcStepFactory.java b/transact/src/main/java/dev/dbos/transact/txstep/JdbcStepFactory.java index da17e8ef..7d714f52 100644 --- a/transact/src/main/java/dev/dbos/transact/txstep/JdbcStepFactory.java +++ b/transact/src/main/java/dev/dbos/transact/txstep/JdbcStepFactory.java @@ -74,20 +74,11 @@ public JdbcStepFactory( @Override protected Optional checkExecution(String workflowId, int stepId, String stepName) { try (var conn = dataSource.getConnection(); - var stmt = conn.prepareStatement(checkSql())) { + var stmt = conn.prepareStatement(TxStepSchema.checkSql(schema))) { stmt.setString(1, workflowId); stmt.setInt(2, stepId); try (var rs = stmt.executeQuery()) { - if (!rs.next()) return Optional.empty(); - return Optional.of( - new StepResult( - workflowId, - stepId, - stepName, - rs.getString("output"), - rs.getString("error"), - null, - rs.getString("serialization"))); + return TxStepSchema.readResult(rs, workflowId, stepId, stepName); } } catch (SQLException e) { throw new RuntimeException(e); @@ -119,14 +110,26 @@ public interface TransactionalFunction { public R txStep( final TransactionalFunction callback, String stepName) throws X { return runTxStep( - (wfId, stepId) -> - executeTransaction( + (wfId, stepId) -> { + try { + return executeTransaction( dataSource, c -> { var result = callback.execute(c); recordOutput(c, wfId, stepId, result); return result; - }), + }); + } catch (StepConflictException e) { + return checkExecution(wfId, stepId, stepName) + .orElseThrow( + () -> + new IllegalStateException( + "Conflict on tx_step_outputs but winner row not found: workflowId=%s stepId=%d stepName=%s" + .formatted(wfId, stepId, stepName), + e)) + .toResult(serializer); + } + }, stepName); } @@ -204,13 +207,16 @@ private void recordOutput(Connection conn, String workflowId, int stepId, R @Override protected void recordError(String workflowId, int stepId, Exception exception) { var value = SerializationUtil.serializeError(exception, null, serializer); - executeTransaction( - dataSource, - (Connection conn) -> { - recordResult( - conn, workflowId, stepId, null, value.serializedValue(), value.serialization()); - return null; - }); + try { + executeTransaction( + dataSource, + (Connection conn) -> { + recordResult( + conn, workflowId, stepId, null, value.serializedValue(), value.serialization()); + return null; + }); + } catch (StepConflictException ignored) { + } } private void recordResult( @@ -220,7 +226,7 @@ private void recordResult( String output, String error, String serialization) { - try (var stmt = conn.prepareStatement(upsertSql())) { + try (var stmt = conn.prepareStatement(TxStepSchema.upsertSql(schema))) { stmt.setString(1, workflowId); stmt.setInt(2, stepId); stmt.setString(3, output); @@ -228,6 +234,7 @@ private void recordResult( stmt.setString(5, serialization); stmt.executeUpdate(); } catch (SQLException e) { + if (isUniqueViolation(e)) throw new StepConflictException(e); throw new RuntimeException(e); } } diff --git a/transact/src/main/java/dev/dbos/transact/txstep/PostgresStepFactory.java b/transact/src/main/java/dev/dbos/transact/txstep/PostgresStepFactory.java index 8ba79b09..c61405b5 100644 --- a/transact/src/main/java/dev/dbos/transact/txstep/PostgresStepFactory.java +++ b/transact/src/main/java/dev/dbos/transact/txstep/PostgresStepFactory.java @@ -40,57 +40,16 @@ protected PostgresStepFactory( this.serializer = serializer == null ? config.serializer() : serializer; try (var conn = opener.open()) { - // ensure we're running on Postgres - var productName = conn.getMetaData().getDatabaseProductName(); - if (!productName.equalsIgnoreCase("PostgreSQL")) { - throw new IllegalArgumentException( - "TxStepFactory requires a PostgreSQL datasource, got: " + productName); - } - - // ensure provided schema and tx_step_outputs table exist - try (var stmt = conn.createStatement()) { - stmt.addBatch("CREATE SCHEMA IF NOT EXISTS \"%s\"".formatted(this.schema)); - stmt.addBatch( - """ - CREATE TABLE IF NOT EXISTS "%1$s".tx_step_outputs ( - workflow_id TEXT NOT NULL, - step_id INT NOT NULL, - output TEXT, - error TEXT, - serialization TEXT, - created_at BIGINT NOT NULL DEFAULT (EXTRACT(EPOCH FROM now())*1000)::bigint, - PRIMARY KEY (workflow_id, step_id) - )""" - .formatted(this.schema)); - stmt.executeBatch(); - } + TxStepSchema.verifyPostgres(conn); + TxStepSchema.createTable(conn, this.schema); } catch (SQLException e) { throw new RuntimeException(e); } } - protected String checkSql() { - return """ - SELECT output, error, serialization - FROM "%s".tx_step_outputs - WHERE workflow_id = ? AND step_id = ? - """ - .formatted(schema); - } - protected abstract Optional checkExecution( String workflowId, int stepId, String stepName); - protected String upsertSql() { - return """ - INSERT INTO "%s".tx_step_outputs - (workflow_id, step_id, output, error, serialization) - VALUES (?, ?, ?, ?, ?) - ON CONFLICT DO NOTHING - """ - .formatted(schema); - } - protected abstract void recordError(String workflowId, int stepId, Exception exception); @FunctionalInterface @@ -98,6 +57,19 @@ protected interface TxStepFunction { R execute(String workflowId, int stepId) throws X; } + public static final class StepConflictException extends RuntimeException { + public StepConflictException(Exception cause) { + super(cause); + } + } + + public static boolean isUniqueViolation(Exception e) { + for (Throwable t = e; t != null; t = t.getCause()) { + if (t instanceof SQLException sq && "23505".equals(sq.getSQLState())) return true; + } + return false; + } + @SuppressWarnings("unchecked") protected R runTxStep(TxStepFunction execute, String stepName) throws X { diff --git a/transact/src/main/java/dev/dbos/transact/txstep/TxStepSchema.java b/transact/src/main/java/dev/dbos/transact/txstep/TxStepSchema.java new file mode 100644 index 00000000..0abf5cc7 --- /dev/null +++ b/transact/src/main/java/dev/dbos/transact/txstep/TxStepSchema.java @@ -0,0 +1,73 @@ +package dev.dbos.transact.txstep; + +import dev.dbos.transact.workflow.internal.StepResult; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Optional; + +/** Shared SQL DDL and query constants for the {@code tx_step_outputs} table. */ +public class TxStepSchema { + + private TxStepSchema() {} + + public static void verifyPostgres(Connection conn) throws SQLException { + var productName = conn.getMetaData().getDatabaseProductName(); + if (!productName.equalsIgnoreCase("PostgreSQL")) { + throw new IllegalArgumentException( + "TxStepFactory requires a PostgreSQL datasource, got: " + productName); + } + } + + public static void createTable(Connection conn, String schema) throws SQLException { + try (var stmt = conn.createStatement()) { + stmt.addBatch("CREATE SCHEMA IF NOT EXISTS \"%s\"".formatted(schema)); + stmt.addBatch( + """ + CREATE TABLE IF NOT EXISTS "%1$s".tx_step_outputs ( + workflow_id TEXT NOT NULL, + step_id INT NOT NULL, + output TEXT, + error TEXT, + serialization TEXT, + created_at BIGINT NOT NULL DEFAULT (EXTRACT(EPOCH FROM now())*1000)::bigint, + PRIMARY KEY (workflow_id, step_id) + )""" + .formatted(schema)); + stmt.executeBatch(); + } + } + + public static String checkSql(String schema) { + return """ + SELECT output, error, serialization + FROM "%s".tx_step_outputs + WHERE workflow_id = ? AND step_id = ? + """ + .formatted(schema); + } + + public static String upsertSql(String schema) { + return """ + INSERT INTO "%s".tx_step_outputs + (workflow_id, step_id, output, error, serialization) + VALUES (?, ?, ?, ?, ?) + """ + .formatted(schema); + } + + public static Optional readResult( + ResultSet rs, String workflowId, int stepId, String stepName) throws SQLException { + if (!rs.next()) return Optional.empty(); + return Optional.of( + new StepResult( + workflowId, + stepId, + stepName, + rs.getString("output"), + rs.getString("error"), + null, + rs.getString("serialization"))); + } +} diff --git a/transact/src/test/java/dev/dbos/transact/txstep/JdbcStepFactoryTest.java b/transact/src/test/java/dev/dbos/transact/txstep/JdbcStepFactoryTest.java index e2cba816..0facfa12 100644 --- a/transact/src/test/java/dev/dbos/transact/txstep/JdbcStepFactoryTest.java +++ b/transact/src/test/java/dev/dbos/transact/txstep/JdbcStepFactoryTest.java @@ -19,6 +19,8 @@ import java.sql.SQLException; import java.util.Objects; +import javax.sql.DataSource; + import com.zaxxer.hikari.HikariDataSource; import org.junit.jupiter.api.AutoClose; import org.junit.jupiter.api.BeforeEach; @@ -34,14 +36,18 @@ record TestResult(String user, int greetCount) {} TestResult readWorkflow(String user) throws SQLException; TestResult insertThenReadWorkflow(String user) throws SQLException; + + TestResult conflictWorkflow(String user) throws SQLException; } class FactoryTestServiceImpl implements FactoryTestService { private final JdbcStepFactory stepFactory; + private final DataSource dataSource; - public FactoryTestServiceImpl(JdbcStepFactory stepFactory) { + public FactoryTestServiceImpl(JdbcStepFactory stepFactory, DataSource dataSource) { this.stepFactory = stepFactory; + this.dataSource = dataSource; } TestResult insertGreeting(Connection conn, String user) throws SQLException { @@ -108,6 +114,37 @@ public TestResult insertThenReadWorkflow(String user) throws SQLException { stepFactory.txStep((Connection c) -> insertGreeting(c, user), "insertGreeting"); return stepFactory.txStep((Connection c) -> readGreeting(c, user), "readGreeting"); } + + // Simulates a concurrent winner committing a result while this executor's transaction is still + // open. The separate autocommit connection represents the other executor — its INSERT persists + // even when the main transaction is rolled back. When recordOutput subsequently tries to INSERT + // the same (workflowId, stepId) key, it gets a 23505 unique-constraint violation. The factory + // rolls back the main transaction and falls back to checkExecution to return the winner's value. + TestResult conflictGreeting(Connection conn, String user, TestResult winner) throws SQLException { + var wfId = Objects.requireNonNull(DBOS.workflowId()); + var value = SerializationUtil.serializeValue(winner, null, null); + var sql = + """ + INSERT INTO "%s".tx_step_outputs(workflow_id, step_id, output, error, serialization) + VALUES (?, 0, ?, NULL, ?) + """ + .formatted(stepFactory.schema); + try (var conn2 = dataSource.getConnection(); + var stmt = conn2.prepareStatement(sql)) { + stmt.setString(1, wfId); + stmt.setString(2, value.serializedValue()); + stmt.setString(3, value.serialization()); + stmt.executeUpdate(); + } + return insertGreeting(conn, user); + } + + @Override + @Workflow + public TestResult conflictWorkflow(String user) throws SQLException { + var winner = new TestResult(user, 99); + return stepFactory.txStep((Connection c) -> conflictGreeting(c, user, winner), "conflictStep"); + } } public class JdbcStepFactoryTest { @@ -139,7 +176,7 @@ void beforeEach() throws SQLException { dbos = new DBOS(dbosConfig); stepFactory = new JdbcStepFactory(dbos, dataSource); - impl = new FactoryTestServiceImpl(stepFactory); + impl = new FactoryTestServiceImpl(stepFactory, dataSource); proxy = dbos.registerProxy(FactoryTestService.class, impl); dbos.launch(); @@ -381,6 +418,33 @@ public void testRetryPartialMultipleSteps() throws Exception { assertTrue(txSteps.get(1).createdAt() >= relaunchTimestamp); // step 1: re-executed on retry } + // Two executors race to write the result for the same step. The loser detects the 23505 + // conflict on its INSERT, rolls back its transaction, and returns the winner's stored value. + @Test + public void testUpsertConflict() throws Exception { + var wfid = "wf-conflict"; + var user = "testUser"; + + try (var _o = new WorkflowOptions(wfid).setContext()) { + var result = proxy.conflictWorkflow(user); + // Returns winner's sentinel value (99), not what insertGreeting would have produced (1) + assertEquals(99, result.greetCount()); + assertEquals(user, result.user()); + } + + // Main transaction was rolled back — insertGreeting's write never committed + assertEquals(0, getGreetCount(user)); + + // Exactly one tx_step_outputs row containing the winner's result + var rows = DBUtils.getTxStepRows(dataSource, wfid); + assertEquals(1, rows.size()); + var row = rows.get(0); + assertNotNull(row.output()); + assertNull(row.error()); + var output = SerializationUtil.deserializeValue(row.output(), row.serialization(), null); + assertEquals(new FactoryTestService.TestResult(user, 99), output); + } + @Test public void testRetryInsert() throws Exception { var timestamp = System.currentTimeMillis();