99import dev .dbos .transact .DBOS ;
1010import dev .dbos .transact .config .DBOSConfig ;
1111import dev .dbos .transact .context .WorkflowOptions ;
12+ import dev .dbos .transact .database .SystemDatabase ;
1213import dev .dbos .transact .json .SerializationUtil ;
1314import dev .dbos .transact .utils .DBUtils ;
1415import dev .dbos .transact .utils .PgContainer ;
1819import java .sql .SQLException ;
1920import java .util .Objects ;
2021
22+ import javax .sql .DataSource ;
23+
2124import com .zaxxer .hikari .HikariDataSource ;
2225import org .jdbi .v3 .core .Handle ;
2326import org .jdbi .v3 .core .Jdbi ;
@@ -35,14 +38,20 @@ record TestResult(String user, int greetCount) {}
3538 TestResult readWorkflow (String user );
3639
3740 TestResult insertThenReadWorkflow (String user );
41+
42+ TestResult conflictWorkflow (String user ) throws SQLException ;
3843}
3944
4045class FactoryTestServiceImpl implements FactoryTestService {
4146
4247 private final JdbiStepFactory stepFactory ;
48+ private final DataSource dataSource ;
49+ final String schema ;
4350
44- public FactoryTestServiceImpl (JdbiStepFactory stepFactory ) {
51+ public FactoryTestServiceImpl (JdbiStepFactory stepFactory , DataSource dataSource , String schema ) {
4552 this .stepFactory = stepFactory ;
53+ this .dataSource = dataSource ;
54+ this .schema = schema ;
4655 }
4756
4857 FactoryTestService .TestResult insertGreeting (Handle handle , String user ) {
@@ -106,6 +115,38 @@ public FactoryTestService.TestResult insertThenReadWorkflow(String user) {
106115 stepFactory .useStep ((Handle h ) -> insertGreeting (h , user ), "insertGreeting" );
107116 return stepFactory .inStep ((Handle h ) -> readGreeting (h , user ), "readGreeting" );
108117 }
118+
119+ // Simulates a concurrent winner committing a result while this executor's transaction is still
120+ // open. The separate autocommit connection represents the other executor — its INSERT persists
121+ // even when JDBI rolls back the main transaction. When recordResult subsequently tries to INSERT
122+ // the same (workflowId, stepId) key, it gets a 23505 unique-constraint violation. The factory
123+ // rolls back and falls back to checkExecution to return the winner's value.
124+ FactoryTestService .TestResult conflictGreeting (
125+ Handle handle , String user , FactoryTestService .TestResult winner ) throws SQLException {
126+ var wfId = Objects .requireNonNull (DBOS .workflowId ());
127+ var value = SerializationUtil .serializeValue (winner , null , null );
128+ var sql =
129+ """
130+ INSERT INTO "%s".tx_step_outputs(workflow_id, step_id, output, error, serialization)
131+ VALUES (?, 0, ?, NULL, ?)
132+ """
133+ .formatted (schema );
134+ try (var conn2 = dataSource .getConnection ();
135+ var stmt = conn2 .prepareStatement (sql )) {
136+ stmt .setString (1 , wfId );
137+ stmt .setString (2 , value .serializedValue ());
138+ stmt .setString (3 , value .serialization ());
139+ stmt .executeUpdate ();
140+ }
141+ return insertGreeting (handle , user );
142+ }
143+
144+ @ Override
145+ @ Workflow
146+ public FactoryTestService .TestResult conflictWorkflow (String user ) throws SQLException {
147+ var winner = new FactoryTestService .TestResult (user , 99 );
148+ return stepFactory .inStep ((Handle h ) -> conflictGreeting (h , user , winner ), "conflictStep" );
149+ }
109150}
110151
111152public class JdbiStepFactoryTest {
@@ -134,7 +175,9 @@ void beforeEach() throws SQLException {
134175 Jdbi jdbi = Jdbi .create (dataSource );
135176 stepFactory = new JdbiStepFactory (dbos , jdbi );
136177
137- impl = new FactoryTestServiceImpl (stepFactory );
178+ impl =
179+ new FactoryTestServiceImpl (
180+ stepFactory , dataSource , SystemDatabase .sanitizeSchema (dbosConfig .databaseSchema ()));
138181 proxy = dbos .registerProxy (FactoryTestService .class , impl );
139182
140183 dbos .launch ();
@@ -376,6 +419,33 @@ public void testRetryPartialMultipleSteps() throws Exception {
376419 assertTrue (txSteps .get (1 ).createdAt () >= relaunchTimestamp ); // step 1: re-executed on retry
377420 }
378421
422+ // Two executors race to write the result for the same step. The loser detects the 23505
423+ // conflict on its INSERT, rolls back its transaction, and returns the winner's stored value.
424+ @ Test
425+ public void testUpsertConflict () throws Exception {
426+ var wfid = "wf-conflict" ;
427+ var user = "testUser" ;
428+
429+ try (var _o = new WorkflowOptions (wfid ).setContext ()) {
430+ var result = proxy .conflictWorkflow (user );
431+ // Returns winner's sentinel value (99), not what insertGreeting would have produced (1)
432+ assertEquals (99 , result .greetCount ());
433+ assertEquals (user , result .user ());
434+ }
435+
436+ // Main transaction was rolled back — insertGreeting's write never committed
437+ assertEquals (0 , getGreetCount (user ));
438+
439+ // Exactly one tx_step_outputs row containing the winner's result
440+ var rows = DBUtils .getTxStepRows (dataSource , wfid );
441+ assertEquals (1 , rows .size ());
442+ var row = rows .get (0 );
443+ assertNotNull (row .output ());
444+ assertNull (row .error ());
445+ var output = SerializationUtil .deserializeValue (row .output (), row .serialization (), null );
446+ assertEquals (new FactoryTestService .TestResult (user , 99 ), output );
447+ }
448+
379449 @ Test
380450 public void testRetryInsert () throws Exception {
381451 var timestamp = System .currentTimeMillis ();
0 commit comments