Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ aspectj = "1.9.22.1"
assertj = "3.27.3"
cron-utils = "9.2.1"
hikaricp = "7.0.2"
kryo = "5.6.2"
jackson = "2.21.2"
java-websocket = "1.6.0"
jaxb-api = "4.0.2"
jdbi = "3.47.0"
jooq = "3.19.15"
jspecify = "1.0.0"
junit = "6.0.3"
junit-pioneer = "2.3.0"
kotlin = "2.3.10"
kryo = "5.6.2"
logback = "1.5.32"
maven-artifact = "3.9.13"
maven-publish = "0.36.0"
Expand All @@ -20,10 +23,10 @@ postgresql = "42.7.10"
rest-assured = "6.0.0"
shadow = "9.4.1"
slf4j = "2.0.17"
sqlite-jdbc = "3.49.1.0"
spotless = "8.4.0"
spring-boot = "3.4.4"
spring-framework = "6.2.5"
sqlite-jdbc = "3.49.1.0"
system-stubs = "2.1.8"
testcontainers = "2.0.4"
versions = "0.53.0"
Expand All @@ -37,6 +40,9 @@ hikaricp = { module = "com.zaxxer:HikariCP", version.ref = "hikaricp" }
jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind", version.ref = "jackson" }
jackson-jsr310 = { module = "com.fasterxml.jackson.datatype:jackson-datatype-jsr310", version.ref = "jackson" }
java-websocket = { module = "org.java-websocket:Java-WebSocket", version.ref = "java-websocket" }
jaxb-api = { module = "jakarta.xml.bind:jakarta.xml.bind-api", version.ref = "jaxb-api" }
jdbi-core = { module = "org.jdbi:jdbi3-core", version.ref = "jdbi" }
jooq = { module = "org.jooq:jooq", version.ref = "jooq" }
jspecify = { module = "org.jspecify:jspecify", version.ref = "jspecify" }
junit-bom = { module = "org.junit:junit-bom", version.ref = "junit" }
junit-jupiter = { module = "org.junit.jupiter:junit-jupiter" }
Expand All @@ -51,11 +57,11 @@ postgresql = { module = "org.postgresql:postgresql", version.ref = "postgresql"
rest-assured = { module = "io.rest-assured:rest-assured", version.ref = "rest-assured" }
slf4j-api = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" }
slf4j-simple = { module = "org.slf4j:slf4j-simple", version.ref = "slf4j" }
sqlite-jdbc = { module = "org.xerial:sqlite-jdbc", version.ref = "sqlite-jdbc" }
spring-aop = { module = "org.springframework:spring-aop", version.ref = "spring-framework" }
spring-boot-autoconfigure = { module = "org.springframework.boot:spring-boot-autoconfigure", version.ref = "spring-boot" }
spring-boot-configuration-processor = { module = "org.springframework.boot:spring-boot-configuration-processor", version.ref = "spring-boot" }
spring-boot-test = { module = "org.springframework.boot:spring-boot-test", version.ref = "spring-boot" }
sqlite-jdbc = { module = "org.xerial:sqlite-jdbc", version.ref = "sqlite-jdbc" }
system-stubs-jupiter = { module = "uk.org.webcompere:system-stubs-jupiter", version.ref = "system-stubs" }
testcontainers-postgresql = { module = "org.testcontainers:testcontainers-postgresql", version.ref = "testcontainers" }

Expand Down
8 changes: 7 additions & 1 deletion settings.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
rootProject.name = "dbos-transact-java"

include("transact", "transact-cli", "transact-spring-boot-starter")
include(
"transact",
"transact-cli",
"transact-spring-boot-starter",
"transact-jdbi-step-factory",
"transact-jooq-step-factory",
)

plugins { id("org.gradle.toolchains.foojay-resolver") version "1.0.0" }

Expand Down
22 changes: 22 additions & 0 deletions transact-jdbi-step-factory/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
plugins { id("java-library") }

tasks.withType<JavaCompile> {
options.compilerArgs.add("-Xlint:unchecked")
options.compilerArgs.add("-Xlint:deprecation")
options.compilerArgs.add("-Xlint:rawtypes")
options.compilerArgs.add("-Werror")
}

dependencies {
api(project(":transact"))
api(libs.jdbi.core)

testImplementation(platform(libs.junit.bom))
testImplementation(libs.junit.jupiter)
testRuntimeOnly(libs.junit.platform.launcher)

testRuntimeOnly(libs.logback.classic)
testImplementation(libs.testcontainers.postgresql)
testImplementation(libs.postgresql)
testImplementation(libs.hikaricp)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package dev.dbos.transact.jdbi;

import dev.dbos.transact.DBOS;
import dev.dbos.transact.json.DBOSSerializer;
import dev.dbos.transact.txstep.PostgresStepFactory;
import dev.dbos.transact.workflow.internal.StepResult;

import java.sql.SQLException;

import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.Jdbi;
import org.jspecify.annotations.Nullable;

/**
* A {@link PostgresStepFactory} implementation backed by Jdbi3 {@link Handle} objects.
*
* <p>Construct one with a {@link Jdbi} instance pointing at a PostgreSQL database. The constructor
* verifies the datasource is PostgreSQL and creates the {@code tx_step_outputs} table if needed.
* User lambdas passed to {@code txStep} receive a {@link Handle} with a transaction already
* started; they should not call {@code commit} or {@code close} themselves.
*
* <pre>{@code
* JdbiStepFactory factory = new JdbiStepFactory(dbos, Jdbi.create(dataSource));
*
* // inside a @Workflow method:
* int count = factory.txStep(handle -> {
* return handle.createUpdate("INSERT INTO ...").execute();
* }, "myStep");
* }</pre>
*/
public class JdbiStepFactory extends PostgresStepFactory<Handle> {

private final Jdbi jdbi;

/** Creates a factory using the schema from the DBOS config. */
public JdbiStepFactory(DBOS dbos, Jdbi jdbi) {
this(dbos, jdbi, null, null);
}

/** Creates a factory using a custom schema for {@code tx_step_outputs}. */
public JdbiStepFactory(DBOS dbos, Jdbi jdbi, String schema) {
this(dbos, jdbi, schema, null);
}

/** Creates a factory using a custom serializer. */
public JdbiStepFactory(DBOS dbos, Jdbi jdbi, DBOSSerializer serializer) {
this(dbos, jdbi, null, serializer);
}

/** Creates a factory with a custom schema and serializer. */
public JdbiStepFactory(DBOS dbos, Jdbi jdbi, String schema, DBOSSerializer serializer) {
super(dbos, schema, serializer);
this.jdbi = jdbi;
try {
jdbi.useHandle(
handle -> {
try {
PostgresStepFactory.ensurePostgres(handle.getConnection());
PostgresStepFactory.ensureSchema(handle.getConnection(), this.schema);
PostgresStepFactory.ensureTxOutputTable(handle.getConnection(), this.schema);
} catch (SQLException e) {
throw new RuntimeException(e.getMessage(), e);
}
});
} catch (Exception e) {
if (e instanceof RuntimeException re) {
throw re;
}
throw new RuntimeException(e);
}
}

@Override
protected Handle openTransaction() {
var handle = jdbi.open();
handle.begin();
return handle;
}

@Override
protected Handle openConnection() {
return jdbi.open();
}

@Override
protected void commit(Handle handle) {
handle.commit();
}

@Override
protected void rollback(Handle handle) {
handle.rollback();
}

@Override
protected void close(Handle handle) {
handle.close();
}

@Override
protected @Nullable StepResult checkExecution(String workflowId, int stepId, String stepName) {
var sql = CHECK_SQL_TEMPLATE.formatted(this.schema);
return jdbi.withHandle(
handle ->
handle
.createQuery(sql)
.bind(0, workflowId)
.bind(1, stepId)
.map(
(rs, ctx) ->
new StepResult(
workflowId,
stepId,
stepName,
rs.getString("output"),
rs.getString("error"),
null,
rs.getString("serialization")))
.findFirst()
.orElse(null));
}

@Override
protected void recordResult(
Handle handle,
String workflowId,
int stepId,
String output,
String error,
String serialization) {
if (output != null && error != null) {
throw new IllegalArgumentException("attempted to record non null output and error result");
}
handle
.createUpdate(UPSERT_SQL_TEMPLATE.formatted(schema))
.bind(0, workflowId)
.bind(1, stepId)
.bind(2, output)
.bind(3, error)
.bind(4, serialization)
.execute();
}
}
Loading
Loading