Skip to content

Commit cc4cc7e

Browse files
authored
support plain DataSource (#268)
DBOS uses `HikariDataSource` if a data source is not provided. This PR updated sysdb + dbos config to support base `DataSource` (i.e. non Hikari data sources) Also added DBOS.registerQueues to register more than one queue at a time
1 parent d44766b commit cc4cc7e

15 files changed

Lines changed: 139 additions & 147 deletions

File tree

transact/src/main/java/dev/dbos/transact/DBOS.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,18 @@ public void shutdown() {
272272
return queue;
273273
}
274274

275+
/**
276+
* Register a set of DBOS queues. Each queue must be registered prior to launch, so that recovery
277+
* has the queue options available.
278+
*
279+
* @param queues collection of `Queue` instances to register
280+
*/
281+
public static void registerQueues(@NonNull Queue... queues) {
282+
for (Queue queue : queues) {
283+
registerQueue(queue);
284+
}
285+
}
286+
275287
/**
276288
* Register a lifecycle listener that receives callbacks when DBOS is launched or shut down
277289
*

transact/src/main/java/dev/dbos/transact/DBOSClient.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919
import java.util.Optional;
2020
import java.util.UUID;
2121

22-
import com.zaxxer.hikari.HikariDataSource;
22+
import javax.sql.DataSource;
23+
2324
import org.jspecify.annotations.NonNull;
2425
import org.jspecify.annotations.Nullable;
2526

@@ -87,7 +88,7 @@ public DBOSClient(
8788
*
8889
* @param dataSource System database data source
8990
*/
90-
public DBOSClient(@NonNull HikariDataSource dataSource) {
91+
public DBOSClient(@NonNull DataSource dataSource) {
9192
this(dataSource, null);
9293
}
9394

@@ -97,7 +98,7 @@ public DBOSClient(@NonNull HikariDataSource dataSource) {
9798
* @param dataSource System database data source
9899
* @param schema Database schema for DBOS tables
99100
*/
100-
public DBOSClient(@NonNull HikariDataSource dataSource, @Nullable String schema) {
101+
public DBOSClient(@NonNull DataSource dataSource, @Nullable String schema) {
101102
systemDatabase = new SystemDatabase(dataSource, schema);
102103
}
103104

transact/src/main/java/dev/dbos/transact/config/DBOSConfig.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
import java.util.stream.Collectors;
1010
import java.util.stream.Stream;
1111

12-
import com.zaxxer.hikari.HikariDataSource;
12+
import javax.sql.DataSource;
13+
1314
import org.jspecify.annotations.NonNull;
1415
import org.jspecify.annotations.Nullable;
1516

@@ -18,7 +19,7 @@ public record DBOSConfig(
1819
@Nullable String databaseUrl,
1920
@Nullable String dbUser,
2021
@Nullable String dbPassword,
21-
@Nullable HikariDataSource dataSource,
22+
@Nullable DataSource dataSource,
2223
boolean adminServer,
2324
int adminServerPort,
2425
boolean migrate,
@@ -146,7 +147,7 @@ public record DBOSConfig(
146147
listenQueues);
147148
}
148149

149-
public @NonNull DBOSConfig withDataSource(@Nullable HikariDataSource v) {
150+
public @NonNull DBOSConfig withDataSource(@Nullable DataSource v) {
150151
return new DBOSConfig(
151152
appName,
152153
databaseUrl,

transact/src/main/java/dev/dbos/transact/database/NotificationsDAO.java

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,21 @@
1414
import java.util.Objects;
1515
import java.util.concurrent.TimeUnit;
1616

17-
import com.zaxxer.hikari.HikariDataSource;
17+
import javax.sql.DataSource;
18+
1819
import org.slf4j.Logger;
1920
import org.slf4j.LoggerFactory;
2021

2122
class NotificationsDAO {
2223

2324
private static final Logger logger = LoggerFactory.getLogger(NotificationsDAO.class);
2425

25-
private final HikariDataSource dataSource;
26+
private final DataSource dataSource;
2627
private final String schema;
2728
private NotificationService notificationService;
2829
private long dbPollingIntervalEventMs = 10000;
2930

30-
NotificationsDAO(HikariDataSource ds, NotificationService nService, String schema) {
31+
NotificationsDAO(DataSource ds, NotificationService nService, String schema) {
3132
this.dataSource = ds;
3233
this.schema = Objects.requireNonNull(schema);
3334
this.notificationService = nService;
@@ -41,10 +42,6 @@ void send(
4142
String workflowUuid, int functionId, String destinationUuid, Object message, String topic)
4243
throws SQLException {
4344

44-
if (dataSource.isClosed()) {
45-
throw new IllegalStateException("Database is closed!");
46-
}
47-
4845
var startTime = System.currentTimeMillis();
4946
String functionName = "DBOS.send";
5047
String finalTopic = (topic != null) ? topic : Constants.DBOS_NULL_TOPIC;
@@ -116,10 +113,6 @@ Object recv(
116113
String workflowUuid, int functionId, int timeoutFunctionId, String topic, Duration timeout)
117114
throws SQLException {
118115

119-
if (dataSource.isClosed()) {
120-
throw new IllegalStateException("Database is closed!");
121-
}
122-
123116
var startTime = System.currentTimeMillis();
124117
String functionName = "DBOS.recv";
125118
String finalTopic = (topic != null) ? topic : Constants.DBOS_NULL_TOPIC;
@@ -304,9 +297,6 @@ ON CONFLICT (workflow_uuid, key, function_id)
304297

305298
void setEvent(String workflowId, int functionId, String key, Object message, boolean asStep)
306299
throws SQLException {
307-
if (dataSource.isClosed()) {
308-
throw new IllegalStateException("Database is closed!");
309-
}
310300

311301
var startTime = System.currentTimeMillis();
312302
String functionName = "DBOS.setEvent";
@@ -353,9 +343,6 @@ void setEvent(String workflowId, int functionId, String key, Object message, boo
353343
Object getEvent(
354344
String targetUuid, String key, Duration timeout, GetWorkflowEventContext callerCtx)
355345
throws SQLException {
356-
if (dataSource.isClosed()) {
357-
throw new IllegalStateException("Database is closed!");
358-
}
359346

360347
var startTime = System.currentTimeMillis();
361348
String functionName = "DBOS.getEvent";

transact/src/main/java/dev/dbos/transact/database/QueuesDAO.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,18 @@
1111
import java.util.Map;
1212
import java.util.Objects;
1313

14-
import com.zaxxer.hikari.HikariDataSource;
14+
import javax.sql.DataSource;
15+
1516
import org.slf4j.Logger;
1617
import org.slf4j.LoggerFactory;
1718

1819
class QueuesDAO {
1920
private static final Logger logger = LoggerFactory.getLogger(QueuesDAO.class);
2021

21-
private final HikariDataSource dataSource;
22+
private final DataSource dataSource;
2223
private final String schema;
2324

24-
QueuesDAO(HikariDataSource ds, String schema) {
25+
QueuesDAO(DataSource ds, String schema) {
2526
this.dataSource = ds;
2627
this.schema = Objects.requireNonNull(schema);
2728
}
@@ -36,9 +37,6 @@ class QueuesDAO {
3637
*/
3738
List<String> getAndStartQueuedWorkflows(
3839
Queue queue, String executorId, String appVersion, String partitionKey) throws SQLException {
39-
if (dataSource.isClosed()) {
40-
throw new IllegalStateException("Database is closed!");
41-
}
4240

4341
if (partitionKey != null && partitionKey.length() == 0) {
4442
partitionKey = null;
@@ -272,9 +270,6 @@ THEN EXTRACT(epoch FROM NOW()) * 1000 + workflow_timeout_ms
272270
}
273271

274272
boolean clearQueueAssignment(String workflowId) throws SQLException {
275-
if (dataSource.isClosed()) {
276-
throw new IllegalStateException("Database is closed!");
277-
}
278273

279274
final String sql =
280275
"""
@@ -295,9 +290,6 @@ boolean clearQueueAssignment(String workflowId) throws SQLException {
295290
}
296291

297292
List<String> getQueuePartitions(String queueName) throws SQLException {
298-
if (dataSource.isClosed()) {
299-
throw new IllegalStateException("Database is closed!");
300-
}
301293

302294
final String sql =
303295
"""

transact/src/main/java/dev/dbos/transact/database/StepsDAO.java

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,33 +14,30 @@
1414
import java.util.List;
1515
import java.util.Objects;
1616

17-
import com.zaxxer.hikari.HikariDataSource;
17+
import javax.sql.DataSource;
18+
1819
import org.slf4j.Logger;
1920
import org.slf4j.LoggerFactory;
2021

2122
class StepsDAO {
2223

2324
private static final Logger logger = LoggerFactory.getLogger(StepsDAO.class);
2425

25-
private final HikariDataSource dataSource;
26+
private final DataSource dataSource;
2627
private final String schema;
2728

28-
StepsDAO(HikariDataSource ds, String schema) {
29+
StepsDAO(DataSource ds, String schema) {
2930
this.dataSource = ds;
3031
this.schema = Objects.requireNonNull(schema);
3132
}
3233

3334
static void recordStepResultTxn(
34-
HikariDataSource dataSource,
35+
DataSource dataSource,
3536
StepResult result,
3637
long startTimeEpochMs,
3738
long endTimeEpochMs,
3839
String schema)
3940
throws SQLException {
40-
if (dataSource.isClosed()) {
41-
throw new IllegalStateException("Database is closed!");
42-
}
43-
4441
try (Connection connection = dataSource.getConnection()) {
4542
recordStepResultTxn(result, startTimeEpochMs, endTimeEpochMs, connection, schema);
4643
}
@@ -197,10 +194,6 @@ static StepResult checkStepExecutionTxn(
197194

198195
List<StepInfo> listWorkflowSteps(String workflowId) throws SQLException {
199196

200-
if (dataSource.isClosed()) {
201-
throw new IllegalStateException("Database is closed!");
202-
}
203-
204197
final String sql =
205198
"""
206199
SELECT function_id, function_name, output, error, child_workflow_id, started_at_epoch_ms, completed_at_epoch_ms
@@ -283,17 +276,9 @@ void sleep(String workflowUuid, int functionId, Duration duration) throws SQLExc
283276
}
284277

285278
static Duration durableSleepDuration(
286-
HikariDataSource dataSource,
287-
String workflowUuid,
288-
int functionId,
289-
Duration duration,
290-
String schema)
279+
DataSource dataSource, String workflowUuid, int functionId, Duration duration, String schema)
291280
throws SQLException {
292281

293-
if (dataSource.isClosed()) {
294-
throw new IllegalStateException("Database is closed!");
295-
}
296-
297282
Objects.requireNonNull(schema);
298283
var startTime = System.currentTimeMillis();
299284
String functionName = "DBOS.sleep";

transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import java.time.Instant;
2020
import java.util.*;
2121

22+
import javax.sql.DataSource;
23+
2224
import com.zaxxer.hikari.HikariConfig;
2325
import com.zaxxer.hikari.HikariDataSource;
2426
import org.slf4j.Logger;
@@ -36,7 +38,7 @@ public static String sanitizeSchema(String schema) {
3638
return "\"%s\"".formatted(schema);
3739
}
3840

39-
private final HikariDataSource dataSource;
41+
private final DataSource dataSource;
4042
private final String schema;
4143
private final boolean created;
4244

@@ -46,7 +48,7 @@ public static String sanitizeSchema(String schema) {
4648
private final NotificationsDAO notificationsDAO;
4749
private final NotificationService notificationService;
4850

49-
private SystemDatabase(HikariDataSource dataSource, String schema, boolean created) {
51+
private SystemDatabase(DataSource dataSource, String schema, boolean created) {
5052
this.schema = sanitizeSchema(schema);
5153
this.dataSource = dataSource;
5254
this.created = created;
@@ -62,7 +64,7 @@ public SystemDatabase(String url, String user, String password, String schema) {
6264
this(createDataSource(url, user, password), schema, true);
6365
}
6466

65-
public SystemDatabase(HikariDataSource dataSource, String schema) {
67+
public SystemDatabase(DataSource dataSource, String schema) {
6668
this(dataSource, schema, false);
6769
}
6870

@@ -75,8 +77,11 @@ public static SystemDatabase create(DBOSConfig config) {
7577
}
7678
}
7779

78-
HikariConfig getConfig() {
79-
return dataSource;
80+
Optional<HikariConfig> getConfig() {
81+
if (dataSource instanceof HikariDataSource hds) {
82+
return Optional.of(hds);
83+
}
84+
return Optional.empty();
8085
}
8186

8287
Connection getSysDBConnection() throws SQLException {
@@ -112,8 +117,8 @@ public static HikariDataSource createDataSource(String url, String user, String
112117
@Override
113118
public void close() {
114119
notificationService.stop();
115-
if (created) {
116-
dataSource.close();
120+
if (created && dataSource instanceof HikariDataSource hikariDataSource) {
121+
hikariDataSource.close();
117122
}
118123
}
119124

@@ -164,7 +169,9 @@ private <T> T dbRetry(SqlSupplier<T> supplier) {
164169
}
165170
if (e instanceof SQLRecoverableException || isConnectionFailure(e)) {
166171
logger.warn("Recoverable connection error. Resetting client pool.", e);
167-
dataSource.getHikariPoolMXBean().softEvictConnections();
172+
if (dataSource instanceof HikariDataSource hikariDataSource) {
173+
hikariDataSource.getHikariPoolMXBean().softEvictConnections();
174+
}
168175
waitForRecovery(attempt, 2000);
169176
} else if (e instanceof SQLTransientException || isTransientState(e)) {
170177
logger.warn("Transient DB error. Retrying command.", e);

0 commit comments

Comments
 (0)