Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ asm = "9.9.1"
aspectj = "1.9.22.1"
assertj = "3.27.7"
cron-utils = "9.2.1"
hibernate = "6.6.15.Final"
hikaricp = "7.0.2"
jackson = "2.21.3"
java-websocket = "1.6.0"
Expand Down Expand Up @@ -46,6 +47,7 @@ jackson-jsr310 = { module = "com.fasterxml.jackson.datatype:jackson-datatype-jsr
java-websocket = { module = "org.java-websocket:Java-WebSocket", version.ref = "java-websocket" }
jaxb-api = { module = "jakarta.xml.bind:jakarta.xml.bind-api", version.ref = "jaxb-api" }
jdbi-core = { module = "org.jdbi:jdbi3-core", version.ref = "jdbi" }
jdbi-spring = { module = "org.jdbi:jdbi3-spring", version.ref = "jdbi" }
jooq = { module = "org.jooq:jooq", version.ref = "jooq" }
jspecify = { module = "org.jspecify:jspecify", version.ref = "jspecify" }
junit-bom = { module = "org.junit:junit-bom", version.ref = "junit" }
Expand All @@ -62,7 +64,11 @@ postgresql = { module = "org.postgresql:postgresql", version.ref = "postgresql"
rest-assured = { module = "io.rest-assured:rest-assured", version.ref = "rest-assured" }
slf4j-api = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" }
slf4j-simple = { module = "org.slf4j:slf4j-simple", version.ref = "slf4j" }
hibernate-core = { module = "org.hibernate.orm:hibernate-core", version.ref = "hibernate" }
spring-aop = { module = "org.springframework:spring-aop", version.ref = "spring-framework" }
spring-jdbc = { module = "org.springframework:spring-jdbc", version.ref = "spring-framework" }
spring-orm = { module = "org.springframework:spring-orm", version.ref = "spring-framework" }
spring-tx = { module = "org.springframework:spring-tx", version.ref = "spring-framework" }
spring-boot-autoconfigure = { module = "org.springframework.boot:spring-boot-autoconfigure", version.ref = "spring-boot" }
spring-boot-configuration-processor = { module = "org.springframework.boot:spring-boot-configuration-processor", version.ref = "spring-boot" }
spring-boot-test = { module = "org.springframework.boot:spring-boot-test", version.ref = "spring-boot" }
Expand Down
1 change: 1 addition & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ include(
"transact",
"transact-cli",
"transact-spring-boot-starter",
"transact-spring-txstep-starter",
"transact-jdbi-step-factory",
"transact-jooq-step-factory",
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import dev.dbos.transact.json.DBOSSerializer;
import dev.dbos.transact.json.SerializationUtil;
import dev.dbos.transact.txstep.PostgresStepFactory;
import dev.dbos.transact.txstep.TxStepSchema;
import dev.dbos.transact.workflow.internal.StepResult;

import java.util.Objects;
Expand All @@ -13,6 +14,7 @@
import org.jdbi.v3.core.HandleCallback;
import org.jdbi.v3.core.HandleConsumer;
import org.jdbi.v3.core.Jdbi;
import org.jdbi.v3.core.statement.UnableToExecuteStatementException;

/**
* Runs idempotent transactional steps inside DBOS workflows using Jdbi3 {@link Handle} objects.
Expand Down Expand Up @@ -107,13 +109,25 @@ public JdbiStepFactory(DBOS dbos, Jdbi jdbi, String schema, DBOSSerializer seria
public <R, X extends Exception> R inStep(final HandleCallback<R, X> callback, String stepName)
throws X {
return runTxStep(
(wfId, stepId) ->
jdbi.inTransaction(
(wfId, stepId) -> {
try {
return jdbi.inTransaction(
h -> {
var result = callback.withHandle(h);
recordOutput(h, wfId, stepId, result);
return result;
}),
});
} catch (StepConflictException e) {
return checkExecution(wfId, stepId, stepName)
.orElseThrow(
() ->
new IllegalStateException(
"Conflict on tx_step_outputs but winner row not found: workflowId=%s stepId=%d stepName=%s"
.formatted(wfId, stepId, stepName),
e))
.<R, X>toResult(serializer);
}
},
stepName);
}

Expand Down Expand Up @@ -144,7 +158,7 @@ public <X extends Exception> void useStep(final HandleConsumer<X> callback, Stri
protected Optional<StepResult> checkExecution(String workflowId, int stepId, String stepName) {
return jdbi.withHandle(
h ->
h.createQuery(checkSql())
h.createQuery(TxStepSchema.checkSql(schema))
.bind(0, workflowId)
.bind(1, stepId)
.map(
Expand All @@ -168,10 +182,13 @@ private <R> void recordOutput(Handle handle, String workflowId, int stepId, R re
@Override
protected void recordError(String workflowId, int stepId, Exception exception) {
var value = SerializationUtil.serializeError(exception, null, serializer);
jdbi.useTransaction(
h ->
recordResult(
h, workflowId, stepId, null, value.serializedValue(), value.serialization()));
try {
jdbi.useTransaction(
h ->
recordResult(
h, workflowId, stepId, null, value.serializedValue(), value.serialization()));
} catch (StepConflictException ignored) {
}
}

private void recordResult(
Expand All @@ -181,13 +198,18 @@ private void recordResult(
String output,
String error,
String serialization) {
handle
.createUpdate(upsertSql())
.bind(0, workflowId)
.bind(1, stepId)
.bind(2, output)
.bind(3, error)
.bind(4, serialization)
.execute();
try {
handle
.createUpdate(TxStepSchema.upsertSql(schema))
.bind(0, workflowId)
.bind(1, stepId)
.bind(2, output)
.bind(3, error)
.bind(4, serialization)
.execute();
} catch (UnableToExecuteStatementException e) {
if (isUniqueViolation(e)) throw new StepConflictException(e);
throw e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import dev.dbos.transact.DBOS;
import dev.dbos.transact.config.DBOSConfig;
import dev.dbos.transact.context.WorkflowOptions;
import dev.dbos.transact.database.SystemDatabase;
import dev.dbos.transact.json.SerializationUtil;
import dev.dbos.transact.utils.DBUtils;
import dev.dbos.transact.utils.PgContainer;
Expand All @@ -18,6 +19,8 @@
import java.sql.SQLException;
import java.util.Objects;

import javax.sql.DataSource;

import com.zaxxer.hikari.HikariDataSource;
import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.Jdbi;
Expand All @@ -35,14 +38,20 @@ record TestResult(String user, int greetCount) {}
TestResult readWorkflow(String user);

TestResult insertThenReadWorkflow(String user);

TestResult conflictWorkflow(String user) throws SQLException;
}

class FactoryTestServiceImpl implements FactoryTestService {

private final JdbiStepFactory stepFactory;
private final DataSource dataSource;
final String schema;

public FactoryTestServiceImpl(JdbiStepFactory stepFactory) {
public FactoryTestServiceImpl(JdbiStepFactory stepFactory, DataSource dataSource, String schema) {
this.stepFactory = stepFactory;
this.dataSource = dataSource;
this.schema = schema;
}

FactoryTestService.TestResult insertGreeting(Handle handle, String user) {
Expand Down Expand Up @@ -106,6 +115,38 @@ public FactoryTestService.TestResult insertThenReadWorkflow(String user) {
stepFactory.useStep((Handle h) -> insertGreeting(h, user), "insertGreeting");
return stepFactory.inStep((Handle h) -> readGreeting(h, user), "readGreeting");
}

// Simulates a concurrent winner committing a result while this executor's transaction is still
// open. The separate autocommit connection represents the other executor — its INSERT persists
// even when JDBI rolls back the main transaction. When recordResult subsequently tries to INSERT
// the same (workflowId, stepId) key, it gets a 23505 unique-constraint violation. The factory
// rolls back and falls back to checkExecution to return the winner's value.
FactoryTestService.TestResult conflictGreeting(
Handle handle, String user, FactoryTestService.TestResult winner) throws SQLException {
var wfId = Objects.requireNonNull(DBOS.workflowId());
var value = SerializationUtil.serializeValue(winner, null, null);
var sql =
"""
INSERT INTO "%s".tx_step_outputs(workflow_id, step_id, output, error, serialization)
VALUES (?, 0, ?, NULL, ?)
"""
.formatted(schema);
try (var conn2 = dataSource.getConnection();
var stmt = conn2.prepareStatement(sql)) {
stmt.setString(1, wfId);
stmt.setString(2, value.serializedValue());
stmt.setString(3, value.serialization());
stmt.executeUpdate();
}
return insertGreeting(handle, user);
}

@Override
@Workflow
public FactoryTestService.TestResult conflictWorkflow(String user) throws SQLException {
var winner = new FactoryTestService.TestResult(user, 99);
return stepFactory.inStep((Handle h) -> conflictGreeting(h, user, winner), "conflictStep");
}
}

public class JdbiStepFactoryTest {
Expand Down Expand Up @@ -134,7 +175,9 @@ void beforeEach() throws SQLException {
Jdbi jdbi = Jdbi.create(dataSource);
stepFactory = new JdbiStepFactory(dbos, jdbi);

impl = new FactoryTestServiceImpl(stepFactory);
impl =
new FactoryTestServiceImpl(
stepFactory, dataSource, SystemDatabase.sanitizeSchema(dbosConfig.databaseSchema()));
proxy = dbos.registerProxy(FactoryTestService.class, impl);

dbos.launch();
Expand Down Expand Up @@ -376,6 +419,33 @@ public void testRetryPartialMultipleSteps() throws Exception {
assertTrue(txSteps.get(1).createdAt() >= relaunchTimestamp); // step 1: re-executed on retry
}

// Two executors race to write the result for the same step. The loser detects the 23505
// conflict on its INSERT, rolls back its transaction, and returns the winner's stored value.
@Test
public void testUpsertConflict() throws Exception {
var wfid = "wf-conflict";
var user = "testUser";

try (var _o = new WorkflowOptions(wfid).setContext()) {
var result = proxy.conflictWorkflow(user);
// Returns winner's sentinel value (99), not what insertGreeting would have produced (1)
assertEquals(99, result.greetCount());
assertEquals(user, result.user());
}

// Main transaction was rolled back — insertGreeting's write never committed
assertEquals(0, getGreetCount(user));

// Exactly one tx_step_outputs row containing the winner's result
var rows = DBUtils.getTxStepRows(dataSource, wfid);
assertEquals(1, rows.size());
var row = rows.get(0);
assertNotNull(row.output());
assertNull(row.error());
var output = SerializationUtil.deserializeValue(row.output(), row.serialization(), null);
assertEquals(new FactoryTestService.TestResult(user, 99), output);
}

@Test
public void testRetryInsert() throws Exception {
var timestamp = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import dev.dbos.transact.json.DBOSSerializer;
import dev.dbos.transact.json.SerializationUtil;
import dev.dbos.transact.txstep.PostgresStepFactory;
import dev.dbos.transact.txstep.TxStepSchema;
import dev.dbos.transact.workflow.internal.StepResult;

import java.util.Objects;
Expand All @@ -13,6 +14,7 @@
import org.jooq.DSLContext;
import org.jooq.TransactionalCallable;
import org.jooq.TransactionalRunnable;
import org.jooq.exception.DataAccessException;

/**
* Runs idempotent transactional steps inside DBOS workflows using jOOQ {@link DSLContext} objects.
Expand Down Expand Up @@ -86,13 +88,25 @@ public JooqStepFactory(DBOS dbos, DSLContext dsl, String schema, DBOSSerializer
*/
public <T> T txStepResult(TransactionalCallable<T> callback, String stepName) {
return runTxStep(
(wfId, stepId) ->
dsl.transactionResult(
(wfId, stepId) -> {
try {
return dsl.transactionResult(
trx -> {
var result = callback.run(trx);
recordOutput(trx, wfId, stepId, result);
return result;
}),
});
} catch (StepConflictException e) {
return checkExecution(wfId, stepId, stepName)
.orElseThrow(
() ->
new IllegalStateException(
"Conflict on tx_step_outputs but winner row not found: workflowId=%s stepId=%d stepName=%s"
.formatted(wfId, stepId, stepName),
e))
.<T, RuntimeException>toResult(serializer);
}
},
stepName);
}

Expand All @@ -119,7 +133,7 @@ public void txStep(TransactionalRunnable transactional, String stepName) {

@Override
protected Optional<StepResult> checkExecution(String workflowId, int stepId, String stepName) {
return dsl.fetchOptional(checkSql(), workflowId, stepId)
return dsl.fetchOptional(TxStepSchema.checkSql(schema), workflowId, stepId)
.map(
r ->
new StepResult(
Expand All @@ -141,15 +155,18 @@ private <R> void recordOutput(Configuration trx, String workflowId, int stepId,
@Override
protected void recordError(String workflowId, int stepId, Exception exception) {
var value = SerializationUtil.serializeError(exception, null, serializer);
dsl.transaction(
trx ->
recordResult(
trx.dsl(),
workflowId,
stepId,
null,
value.serializedValue(),
value.serialization()));
try {
dsl.transaction(
trx ->
recordResult(
trx.dsl(),
workflowId,
stepId,
null,
value.serializedValue(),
value.serialization()));
} catch (StepConflictException ignored) {
}
}

private void recordResult(
Expand All @@ -159,6 +176,11 @@ private void recordResult(
String output,
String error,
String serialization) {
ctx.execute(upsertSql(), workflowId, stepId, output, error, serialization);
try {
ctx.execute(TxStepSchema.upsertSql(schema), workflowId, stepId, output, error, serialization);
} catch (DataAccessException e) {
if (isUniqueViolation(e)) throw new StepConflictException(e);
throw e;
}
}
}
Loading