Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ aspectj = "1.9.22.1"
assertj = "3.27.3"
cron-utils = "9.2.1"
hikaricp = "7.0.2"
kryo = "5.6.2"
jackson = "2.21.2"
java-websocket = "1.6.0"
jaxb-api = "4.0.2"
jdbi = "3.47.0"
jooq = "3.19.15"
jspecify = "1.0.0"
junit = "6.0.3"
junit-pioneer = "2.3.0"
kotlin = "2.3.10"
kryo = "5.6.2"
logback = "1.5.32"
maven-artifact = "3.9.13"
maven-publish = "0.36.0"
Expand All @@ -20,10 +23,10 @@ postgresql = "42.7.10"
rest-assured = "6.0.0"
shadow = "9.4.1"
slf4j = "2.0.17"
sqlite-jdbc = "3.49.1.0"
spotless = "8.4.0"
spring-boot = "3.4.4"
spring-framework = "6.2.5"
sqlite-jdbc = "3.49.1.0"
system-stubs = "2.1.8"
testcontainers = "2.0.4"
versions = "0.53.0"
Expand All @@ -37,6 +40,9 @@ hikaricp = { module = "com.zaxxer:HikariCP", version.ref = "hikaricp" }
jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind", version.ref = "jackson" }
jackson-jsr310 = { module = "com.fasterxml.jackson.datatype:jackson-datatype-jsr310", version.ref = "jackson" }
java-websocket = { module = "org.java-websocket:Java-WebSocket", version.ref = "java-websocket" }
jaxb-api = { module = "jakarta.xml.bind:jakarta.xml.bind-api", version.ref = "jaxb-api" }
jdbi-core = { module = "org.jdbi:jdbi3-core", version.ref = "jdbi" }
jooq = { module = "org.jooq:jooq", version.ref = "jooq" }
jspecify = { module = "org.jspecify:jspecify", version.ref = "jspecify" }
junit-bom = { module = "org.junit:junit-bom", version.ref = "junit" }
junit-jupiter = { module = "org.junit.jupiter:junit-jupiter" }
Expand All @@ -51,11 +57,11 @@ postgresql = { module = "org.postgresql:postgresql", version.ref = "postgresql"
rest-assured = { module = "io.rest-assured:rest-assured", version.ref = "rest-assured" }
slf4j-api = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" }
slf4j-simple = { module = "org.slf4j:slf4j-simple", version.ref = "slf4j" }
sqlite-jdbc = { module = "org.xerial:sqlite-jdbc", version.ref = "sqlite-jdbc" }
spring-aop = { module = "org.springframework:spring-aop", version.ref = "spring-framework" }
spring-boot-autoconfigure = { module = "org.springframework.boot:spring-boot-autoconfigure", version.ref = "spring-boot" }
spring-boot-configuration-processor = { module = "org.springframework.boot:spring-boot-configuration-processor", version.ref = "spring-boot" }
spring-boot-test = { module = "org.springframework.boot:spring-boot-test", version.ref = "spring-boot" }
sqlite-jdbc = { module = "org.xerial:sqlite-jdbc", version.ref = "sqlite-jdbc" }
system-stubs-jupiter = { module = "uk.org.webcompere:system-stubs-jupiter", version.ref = "system-stubs" }
testcontainers-postgresql = { module = "org.testcontainers:testcontainers-postgresql", version.ref = "testcontainers" }

Expand Down
8 changes: 7 additions & 1 deletion settings.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
rootProject.name = "dbos-transact-java"

include("transact", "transact-cli", "transact-spring-boot-starter")
include(
"transact",
"transact-cli",
"transact-spring-boot-starter",
"transact-jdbi-step-factory",
"transact-jooq-step-factory",
)

plugins { id("org.gradle.toolchains.foojay-resolver") version "1.0.0" }

Expand Down
22 changes: 22 additions & 0 deletions transact-jdbi-step-factory/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
plugins { id("java-library") }

tasks.withType<JavaCompile> {
options.compilerArgs.add("-Xlint:unchecked")
options.compilerArgs.add("-Xlint:deprecation")
options.compilerArgs.add("-Xlint:rawtypes")
options.compilerArgs.add("-Werror")
}

dependencies {
api(project(":transact"))
api(libs.jdbi.core)

testImplementation(platform(libs.junit.bom))
testImplementation(libs.junit.jupiter)
testRuntimeOnly(libs.junit.platform.launcher)

testRuntimeOnly(libs.logback.classic)
testImplementation(libs.testcontainers.postgresql)
testImplementation(libs.postgresql)
testImplementation(libs.hikaricp)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package dev.dbos.transact.jdbi;

import dev.dbos.transact.DBOS;
import dev.dbos.transact.json.DBOSSerializer;
import dev.dbos.transact.json.SerializationUtil;
import dev.dbos.transact.txstep.PostgresStepFactory;
import dev.dbos.transact.workflow.internal.StepResult;

import java.util.Objects;
import java.util.Optional;

import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.HandleCallback;
import org.jdbi.v3.core.HandleConsumer;
import org.jdbi.v3.core.Jdbi;

/**
* Runs idempotent transactional steps inside DBOS workflows using Jdbi3 {@link Handle} objects.
*
* <p>Construct one with a {@link Jdbi} instance pointing at a PostgreSQL database. The constructor
* verifies the datasource is PostgreSQL and creates the {@code tx_step_outputs} table if needed.
* Lambdas passed to {@link #inStep} or {@link #useStep} receive a {@link Handle} with a transaction
* already open; they must not call {@code commit} or {@code close} themselves.
*
* <pre>{@code
* JdbiStepFactory factory = new JdbiStepFactory(dbos, Jdbi.create(dataSource));
*
* // inside a @Workflow method:
* int count = factory.inStep(handle -> {
* return handle.createUpdate("INSERT INTO ...").execute();
* }, "myStep");
* }</pre>
*/
public class JdbiStepFactory extends PostgresStepFactory {

private final Jdbi jdbi;

/**
* Creates a factory using the schema and serializer from {@code dbos} configuration.
*
* @param dbos the DBOS runtime instance
* @param jdbi a Jdbi instance connected to a PostgreSQL database
*/
public JdbiStepFactory(DBOS dbos, Jdbi jdbi) {
this(dbos, jdbi, null, null);
}

/**
* Creates a factory using the given schema and the serializer from {@code dbos} configuration.
*
* @param dbos the DBOS runtime instance
* @param jdbi a Jdbi instance connected to a PostgreSQL database
* @param schema the PostgreSQL schema to use for {@code tx_step_outputs}; {@code null} uses the
* schema from {@code dbos} configuration
*/
public JdbiStepFactory(DBOS dbos, Jdbi jdbi, String schema) {
this(dbos, jdbi, schema, null);
}

/**
* Creates a factory using the given serializer and the schema from {@code dbos} configuration.
*
* @param dbos the DBOS runtime instance
* @param jdbi a Jdbi instance connected to a PostgreSQL database
* @param serializer the serializer to use for step outputs; {@code null} uses the serializer from
* {@code dbos} configuration
*/
public JdbiStepFactory(DBOS dbos, Jdbi jdbi, DBOSSerializer serializer) {
this(dbos, jdbi, null, serializer);
}

/**
* Creates a factory with explicit schema and serializer overrides.
*
* <p>Connects to the database immediately to verify it is PostgreSQL and to create the {@code
* tx_step_outputs} table in the given schema if it does not already exist.
*
* @param dbos the DBOS runtime instance
* @param jdbi a Jdbi instance connected to a PostgreSQL database
* @param schema the PostgreSQL schema to use for {@code tx_step_outputs}; {@code null} uses the
* schema from {@code dbos} configuration
* @param serializer the serializer to use for step outputs; {@code null} uses the serializer from
* {@code dbos} configuration
* @throws RuntimeException if the datasource is not PostgreSQL or the schema setup fails
*/
public JdbiStepFactory(DBOS dbos, Jdbi jdbi, String schema, DBOSSerializer serializer) {
super(dbos, schema, serializer, () -> jdbi.open().getConnection());
this.jdbi = Objects.requireNonNull(jdbi);
}

/**
* Executes {@code callback} as an idempotent DBOS step inside a Jdbi transaction.
*
* <p>If a result for this step is already recorded (e.g. on workflow retry), the callback is
* skipped and the cached result is returned. Otherwise the callback runs inside an open
* transaction; the output is recorded atomically with the database work so the step is
* exactly-once on success.
*
* @param <R> the return type of the callback
* @param <X> the checked exception type the callback may throw
* @param callback the database work to perform; receives an open {@link Handle} and must not
* commit or close it
* @param stepName a stable name that identifies this step within the workflow
* @return the value returned by {@code callback}
* @throws X if the callback throws
*/
public <R, X extends Exception> R inStep(final HandleCallback<R, X> callback, String stepName)
throws X {
return runTxStep(
(wfId, stepId) ->
jdbi.inTransaction(
h -> {
var result = callback.withHandle(h);
recordOutput(h, wfId, stepId, result);
return result;
}),
stepName);
}

/**
* Executes {@code callback} as an idempotent DBOS step inside a Jdbi transaction, with no return
* value.
*
* <p>Behaves identically to {@link #inStep} but accepts a {@link HandleConsumer} for callers that
* do not need to return a result.
*
* @param <X> the checked exception type the callback may throw
* @param callback the database work to perform; receives an open {@link Handle} and must not
* commit or close it
* @param stepName a stable name that identifies this step within the workflow
* @throws X if the callback throws
*/
public <X extends Exception> void useStep(final HandleConsumer<X> callback, String stepName)
throws X {
inStep(
handle -> {
callback.useHandle(handle);
return null;
},
stepName);
}

@Override
protected Optional<StepResult> checkExecution(String workflowId, int stepId, String stepName) {
return jdbi.withHandle(
h ->
h.createQuery(checkSql())
.bind(0, workflowId)
.bind(1, stepId)
.map(
(rs, ctx) ->
new StepResult(
workflowId,
stepId,
stepName,
rs.getString("output"),
rs.getString("error"),
null,
rs.getString("serialization")))
.findOne());
}

private <R> void recordOutput(Handle handle, String workflowId, int stepId, R result) {
var value = SerializationUtil.serializeValue(result, null, serializer);
recordResult(handle, workflowId, stepId, value.serializedValue(), null, value.serialization());
}

@Override
protected void recordError(String workflowId, int stepId, Exception exception) {
var value = SerializationUtil.serializeError(exception, null, serializer);
jdbi.useTransaction(
h ->
recordResult(
h, workflowId, stepId, null, value.serializedValue(), value.serialization()));
}

private void recordResult(
Handle handle,
String workflowId,
int stepId,
String output,
String error,
String serialization) {
handle
.createUpdate(upsertSql())
.bind(0, workflowId)
.bind(1, stepId)
.bind(2, output)
.bind(3, error)
.bind(4, serialization)
.execute();
}
}
Loading
Loading