Skip to content

Commit 053241c

Browse files
authored
Assorted SysDB Improvements (#317)
combination of improvements suggested by claude clode and Oracle's VSCode extension
1 parent 7ae3e7e commit 053241c

7 files changed

Lines changed: 385 additions & 178 deletions

File tree

transact/src/main/java/dev/dbos/transact/database/NotificationService.java

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.sql.Statement;
66
import java.util.Map;
77
import java.util.concurrent.ConcurrentHashMap;
8+
import java.util.concurrent.atomic.AtomicReference;
89
import java.util.concurrent.locks.Condition;
910
import java.util.concurrent.locks.ReentrantLock;
1011

@@ -25,8 +26,7 @@ public static class LockConditionPair {
2526
private static final Logger logger = LoggerFactory.getLogger(NotificationService.class);
2627

2728
private final Map<String, LockConditionPair> notificationsMap = new ConcurrentHashMap<>();
28-
private volatile boolean running = false;
29-
private Thread notificationListenerThread;
29+
private final AtomicReference<Thread> notificationListenerThread = new AtomicReference<>(null);
3030
private final DataSource dataSource;
3131

3232
public NotificationService(DataSource dataSource) {
@@ -46,21 +46,20 @@ public void unregisterNotificationCondition(String key) {
4646
}
4747

4848
public void start() {
49-
if (!running) {
50-
running = true;
51-
notificationListenerThread = new Thread(this::notificationListener, "NotificationListener");
52-
notificationListenerThread.setDaemon(true);
53-
notificationListenerThread.start();
49+
Thread t = new Thread(this::notificationListener, "NotificationListener");
50+
t.setDaemon(true);
51+
if (notificationListenerThread.compareAndSet(null, t)) {
52+
t.start();
5453
logger.debug("Notification listener started");
5554
}
5655
}
5756

5857
public void stop() {
59-
running = false;
60-
if (notificationListenerThread != null) {
61-
notificationListenerThread.interrupt();
58+
Thread t = notificationListenerThread.getAndSet(null);
59+
if (t != null) {
60+
t.interrupt();
6261
try {
63-
notificationListenerThread.join(5000); // Wait up to 5 seconds
62+
t.join(5000); // Wait up to 5 seconds
6463
} catch (InterruptedException e) {
6564
Thread.currentThread().interrupt();
6665
}
@@ -71,7 +70,7 @@ public void stop() {
7170
}
7271

7372
private void notificationListener() {
74-
while (running) {
73+
while (notificationListenerThread.get() == Thread.currentThread()) {
7574
Connection notificationConnection = null;
7675

7776
try {
@@ -89,11 +88,9 @@ private void notificationListener() {
8988

9089
logger.debug("Listening for PostgreSQL notifications");
9190

92-
while (running) {
93-
// Check for notifications with a timeout
94-
PGNotification[] notifications = pgConnection.getNotifications(1000); // 1
95-
// second
96-
// timeout
91+
while (notificationListenerThread.get() == Thread.currentThread()) {
92+
// Check for notifications with a one second timeout
93+
PGNotification[] notifications = pgConnection.getNotifications(1000);
9794

9895
if (notifications != null) {
9996
for (PGNotification notification : notifications) {
@@ -102,19 +99,21 @@ private void notificationListener() {
10299

103100
logger.debug("Received notification on channel: {}, payload: {}", channel, payload);
104101

105-
if ("dbos_notifications_channel".equals(channel)) {
106-
handleNotification(payload, "notifications");
107-
} else if ("dbos_workflow_events_channel".equals(channel)) {
108-
handleNotification(payload, "workflow_events");
109-
} else {
110-
logger.error("Unknown NOTIFY channel: {}", channel);
111-
}
102+
if (null == channel) {
103+
logger.error("Received notification with null channel. Payload: {}", payload);
104+
} else
105+
switch (channel) {
106+
case "dbos_notifications_channel" -> handleNotification(payload, "notifications");
107+
case "dbos_workflow_events_channel" ->
108+
handleNotification(payload, "workflow_events");
109+
default -> logger.error("Unknown NOTIFY channel: {}", channel);
110+
}
112111
}
113112
}
114113
}
115114

116115
} catch (Exception e) {
117-
if (running) {
116+
if (notificationListenerThread.get() == Thread.currentThread()) {
118117
logger.warn("Notification listener error: {}", e.getMessage());
119118
try {
120119
Thread.sleep(1000); // Wait before retrying

transact/src/main/java/dev/dbos/transact/database/NotificationsDAO.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class NotificationsDAO {
2828
private final DataSource dataSource;
2929
private final String schema;
3030
private final DBOSSerializer serializer;
31-
private NotificationService notificationService;
31+
private final NotificationService notificationService;
3232
private long dbPollingIntervalEventMs = 10000;
3333

3434
NotificationsDAO(
@@ -169,7 +169,7 @@ Object recv(String workflowId, int stepId, int timeoutFunctionId, String topic,
169169
String finalTopic = (topic != null) ? topic : Constants.DBOS_NULL_TOPIC;
170170

171171
// First, check for previous executions
172-
StepResult recordedOutput = null;
172+
StepResult recordedOutput;
173173
try (Connection c = dataSource.getConnection()) {
174174
recordedOutput =
175175
StepsDAO.checkStepExecutionTxn(workflowId, stepId, functionName, c, this.schema);
@@ -207,7 +207,7 @@ Object recv(String workflowId, int stepId, int timeoutFunctionId, String topic,
207207
while (true) {
208208
// Check if the key is already in the database. If not, wait for the
209209
// notification
210-
boolean hasExistingNotification = false;
210+
boolean hasExistingNotification;
211211
try (Connection conn = dataSource.getConnection()) {
212212
final String sql =
213213
"""
@@ -437,8 +437,7 @@ Object getEvent(
437437
// Check for previous executions only if it's in a workflow
438438
if (callerCtx != null) {
439439

440-
StepResult recordedOutput = null;
441-
440+
StepResult recordedOutput;
442441
try (Connection conn = dataSource.getConnection()) {
443442
recordedOutput =
444443
StepsDAO.checkStepExecutionTxn(
@@ -506,7 +505,7 @@ Object getEvent(
506505
if (nowTime > targetTime) break;
507506

508507
// Consult DB - part of timeout may have expired if sleep is durable.
509-
if (callerCtx != null & !checkedDBForSleep) {
508+
if (callerCtx != null && !checkedDBForSleep) {
510509
actualTimeout =
511510
StepsDAO.durableSleepDuration(
512511
dataSource,

transact/src/main/java/dev/dbos/transact/database/QueuesDAO.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@
33
import dev.dbos.transact.workflow.Queue;
44
import dev.dbos.transact.workflow.WorkflowState;
55

6-
import java.sql.*;
6+
import java.sql.Connection;
7+
import java.sql.PreparedStatement;
8+
import java.sql.ResultSet;
9+
import java.sql.SQLException;
10+
import java.sql.Statement;
711
import java.time.Instant;
812
import java.util.ArrayList;
913
import java.util.HashMap;
@@ -259,7 +263,7 @@ THEN EXTRACT(epoch FROM NOW()) * 1000 + workflow_timeout_ms
259263
}
260264

261265
// Commit only if workflows were dequeued. Avoids WAL bloat and XID advancement.
262-
if (updatedWorkflowIds.size() > 0) {
266+
if (!updatedWorkflowIds.isEmpty()) {
263267
connection.commit();
264268
} else {
265269
connection.rollback();

transact/src/main/java/dev/dbos/transact/database/StepsDAO.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,11 @@ static Duration durableSleepDuration(
308308
Object deserialized =
309309
SerializationUtil.deserializeValue(
310310
recordedOutput.output(), recordedOutput.serialization(), serializer);
311-
endTime = ((Number) deserialized).longValue();
311+
if (deserialized instanceof Long durationLong) {
312+
endTime = durationLong;
313+
} else {
314+
throw new IllegalStateException("Recorded sleep timeout is not a number: " + deserialized);
315+
}
312316
} else {
313317
logger.debug(
314318
"Running sleep, workflow {}, id: {}, duration: {}", workflowUuid, functionId, duration);

0 commit comments

Comments
 (0)