44import dev .dbos .transact .database .SystemDatabase ;
55import dev .dbos .transact .json .DBOSSerializer ;
66import dev .dbos .transact .json .SerializationUtil ;
7+ import dev .dbos .transact .txstep .TransactionalRunnable .WrappedSqlException ;
78import dev .dbos .transact .workflow .internal .StepResult ;
89
910import java .sql .Connection ;
1314
1415import javax .sql .DataSource ;
1516
17+ import org .slf4j .Logger ;
18+ import org .slf4j .LoggerFactory ;
19+
1620/**
1721 * A {@link PostgresStepFactoryHelpers} implementation backed by plain JDBC {@link Connection}
1822 * objects.
3438 */
3539public class JdbcStepFactory {
3640
41+ private static final Logger logger = LoggerFactory .getLogger (JdbcStepFactory .class );
42+
43+ public static final String CHECK_SQL_TEMPLATE =
44+ """
45+ SELECT output, error, serialization
46+ FROM "%s".tx_step_outputs
47+ WHERE workflow_id = ? AND step_id = ?
48+ """ ;
49+
50+ public static final String UPSERT_SQL_TEMPLATE =
51+ """
52+ INSERT INTO "%s".tx_step_outputs
53+ (workflow_id, step_id, output, error, serialization)
54+ VALUES (?, ?, ?, ?, ?)
55+ ON CONFLICT DO NOTHING
56+ """ ;
57+
58+ /**
59+ * Verifies that the given connection is to a PostgreSQL database.
60+ *
61+ * @throws IllegalArgumentException if the database is not PostgreSQL
62+ * @throws SQLException if database metadata cannot be read
63+ */
64+ public static void ensurePostgres (Connection conn ) throws SQLException {
65+ var productName = conn .getMetaData ().getDatabaseProductName ();
66+ if (!productName .equalsIgnoreCase ("PostgreSQL" )) {
67+ throw new IllegalArgumentException (
68+ "PostgresStepFactory requires a PostgreSQL datasource, got: " + productName );
69+ }
70+ }
71+
72+ /**
73+ * Returns {@code true} if the named schema exists in the database.
74+ *
75+ * @throws SQLException if the query fails
76+ */
77+ public static boolean schemaExists (Connection conn , String schema ) throws SQLException {
78+ var sql = "SELECT schema_name FROM information_schema.schemata WHERE schema_name = ?" ;
79+ try (var stmt = conn .prepareStatement (sql )) {
80+ stmt .setString (1 , Objects .requireNonNull (schema , "schema must not be null" ));
81+ try (var rs = stmt .executeQuery ()) {
82+ return rs .next ();
83+ }
84+ }
85+ }
86+
87+ /**
88+ * Creates the named schema if it does not already exist.
89+ *
90+ * @throws SQLException if the DDL fails
91+ */
92+ public static void ensureSchema (Connection conn , String schema ) throws SQLException {
93+ Objects .requireNonNull (schema , "schema must not be null" );
94+ if (!schemaExists (conn , schema )) {
95+ try (var stmt = conn .createStatement ()) {
96+ stmt .execute ("CREATE SCHEMA IF NOT EXISTS \" %s\" " .formatted (schema ));
97+ }
98+ }
99+ }
100+
101+ /**
102+ * Returns {@code true} if the {@code tx_step_outputs} table exists in the named schema.
103+ *
104+ * @throws SQLException if the query fails
105+ */
106+ public static boolean tableExists (Connection conn , String schema ) throws SQLException {
107+ var sql = "SELECT 1 FROM information_schema.tables WHERE table_schema = ? AND table_name = ?" ;
108+ try (var stmt = conn .prepareStatement (sql )) {
109+ stmt .setString (1 , Objects .requireNonNull (schema , "schema must not be null" ));
110+ stmt .setString (2 , Objects .requireNonNull ("tx_step_outputs" , "tableName must not be null" ));
111+ try (var rs = stmt .executeQuery ()) {
112+ return rs .next ();
113+ }
114+ }
115+ }
116+
117+ /**
118+ * Creates the {@code tx_step_outputs} table in the named schema if it does not already exist.
119+ *
120+ * @throws SQLException if the DDL fails
121+ */
122+ public static void ensureTxOutputTable (Connection conn , String schema ) throws SQLException {
123+ Objects .requireNonNull (schema , "schema must not be null" );
124+ if (tableExists (conn , schema )) {
125+ return ;
126+ }
127+ logger .debug ("Creating tx_step_outputs table in schema={}" , schema );
128+ try (var stmt = conn .createStatement ()) {
129+ var ddlSql =
130+ """
131+ CREATE TABLE IF NOT EXISTS "%1$s".tx_step_outputs (
132+ workflow_id TEXT NOT NULL,
133+ step_id INT NOT NULL,
134+ output TEXT,
135+ error TEXT,
136+ serialization TEXT,
137+ created_at BIGINT NOT NULL DEFAULT (EXTRACT(EPOCH FROM now())*1000)::bigint,
138+ PRIMARY KEY (workflow_id, step_id)
139+ )"""
140+ .formatted (schema );
141+ stmt .execute (ddlSql );
142+ }
143+ }
144+
37145 private final DBOS dbos ;
38146 private final DataSource dataSource ;
39147 private final String schema ;
@@ -65,9 +173,9 @@ public JdbcStepFactory(DBOS dbos, DataSource dataSource, String schema, DBOSSeri
65173 this .serializer = serializer == null ? config .serializer () : serializer ;
66174
67175 try (var conn = dataSource .getConnection ()) {
68- PostgresStepFactoryHelpers . ensurePostgres (conn );
69- PostgresStepFactoryHelpers . ensureSchema (conn , this .schema );
70- PostgresStepFactoryHelpers . ensureTxOutputTable (conn , this .schema );
176+ ensurePostgres (conn );
177+ ensureSchema (conn , this .schema );
178+ ensureTxOutputTable (conn , this .schema );
71179 }
72180 }
73181
@@ -183,7 +291,7 @@ private static void close(Connection conn) {
183291 }
184292
185293 private Optional <StepResult > checkExecution (String workflowId , int stepId , String stepName ) {
186- var sql = PostgresStepFactoryHelpers . CHECK_SQL_TEMPLATE .formatted (this .schema );
294+ var sql = CHECK_SQL_TEMPLATE .formatted (this .schema );
187295 try (var conn = dataSource .getConnection ();
188296 var stmt = conn .prepareStatement (sql )) {
189297 stmt .setString (1 , workflowId );
@@ -209,7 +317,8 @@ private Optional<StepResult> checkExecution(String workflowId, int stepId, Strin
209317
210318 private <R > void recordOutput (Connection conn , String workflowId , int stepId , R result ) {
211319 var value = SerializationUtil .serializeValue (result , null , serializer );
212- recordResult (conn , workflowId , stepId , value .serializedValue (), null , value .serialization ());
320+ recordResult (
321+ conn , schema , workflowId , stepId , value .serializedValue (), null , value .serialization ());
213322 }
214323
215324 private <X extends Exception > void recordError (String workflowId , int stepId , X exception ) {
@@ -218,13 +327,20 @@ private <X extends Exception> void recordError(String workflowId, int stepId, X
218327 dataSource ,
219328 (Connection conn ) -> {
220329 recordResult (
221- conn , workflowId , stepId , null , value .serializedValue (), value .serialization ());
330+ conn ,
331+ schema ,
332+ workflowId ,
333+ stepId ,
334+ null ,
335+ value .serializedValue (),
336+ value .serialization ());
222337 return null ;
223338 });
224339 }
225340
226- private void recordResult (
341+ public static void recordResult (
227342 Connection conn ,
343+ String schema ,
228344 String workflowId ,
229345 int stepId ,
230346 String output ,
@@ -233,7 +349,7 @@ private void recordResult(
233349 if (output != null && error != null ) {
234350 throw new IllegalArgumentException ("attempted to record non null output and error result" );
235351 }
236- var sql = PostgresStepFactoryHelpers . UPSERT_SQL_TEMPLATE .formatted (schema );
352+ var sql = UPSERT_SQL_TEMPLATE .formatted (schema );
237353 try (var stmt = conn .prepareStatement (sql )) {
238354 stmt .setString (1 , workflowId );
239355 stmt .setInt (2 , stepId );
0 commit comments