Skip to content

Commit 2c407e7

Browse files
authored
getEvent + recv poll occasionally (#232)
Fixes #203
1 parent 3eb9cb5 commit 2c407e7

5 files changed

Lines changed: 84 additions & 114 deletions

File tree

transact/src/main/java/dev/dbos/transact/database/NotificationsDAO.java

Lines changed: 74 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,18 @@ public class NotificationsDAO {
2525
private final HikariDataSource dataSource;
2626
private final String schema;
2727
private NotificationService notificationService;
28+
private long dbPollingIntervalEventMs = 10000;
2829

2930
NotificationsDAO(HikariDataSource ds, NotificationService nService, String schema) {
3031
this.dataSource = ds;
3132
this.schema = Objects.requireNonNull(schema);
3233
this.notificationService = nService;
3334
}
3435

36+
void speedUpPollingForTest() {
37+
dbPollingIntervalEventMs = 100;
38+
}
39+
3540
public void send(
3641
String workflowUuid, int functionId, String destinationUuid, Object message, String topic)
3742
throws SQLException {
@@ -143,6 +148,11 @@ public Object recv(
143148
String payload = workflowUuid + "::" + finalTopic;
144149
NotificationService.LockConditionPair lockPair = new NotificationService.LockConditionPair();
145150

151+
// Timeout / deadline for the notification
152+
double actualTimeout = timeout.toMillis();
153+
var targetTime = System.currentTimeMillis() + actualTimeout;
154+
var checkedDBForSleep = false;
155+
146156
try {
147157
lockPair.lock.lock();
148158
boolean success = notificationService.registerNotificationCondition(payload, lockPair);
@@ -151,32 +161,42 @@ public Object recv(
151161
throw new DBOSWorkflowExecutionConflictException(workflowUuid);
152162
}
153163

154-
// Check if the key is already in the database. If not, wait for the
155-
// notification
156-
boolean hasExistingNotification = false;
157-
try (Connection conn = dataSource.getConnection()) {
158-
final String sql =
159-
"""
160-
SELECT topic FROM %s.notifications WHERE destination_uuid = ? AND topic = ?
161-
"""
162-
.formatted(this.schema);
163-
164-
try (PreparedStatement stmt = conn.prepareStatement(sql)) {
165-
stmt.setString(1, workflowUuid);
166-
stmt.setString(2, finalTopic);
167-
try (ResultSet rs = stmt.executeQuery()) {
168-
hasExistingNotification = rs.next();
164+
while (true) {
165+
// Check if the key is already in the database. If not, wait for the
166+
// notification
167+
boolean hasExistingNotification = false;
168+
try (Connection conn = dataSource.getConnection()) {
169+
final String sql =
170+
"""
171+
SELECT topic FROM %s.notifications WHERE destination_uuid = ? AND topic = ?
172+
"""
173+
.formatted(this.schema);
174+
175+
try (PreparedStatement stmt = conn.prepareStatement(sql)) {
176+
stmt.setString(1, workflowUuid);
177+
stmt.setString(2, finalTopic);
178+
try (ResultSet rs = stmt.executeQuery()) {
179+
hasExistingNotification = rs.next();
180+
}
169181
}
170182
}
171-
}
172183

173-
if (!hasExistingNotification) {
184+
if (hasExistingNotification) break;
185+
186+
var nowTime = System.currentTimeMillis();
187+
174188
// Wait for the notification
175-
// Support OAOO sleep
176-
double actualTimeout =
177-
StepsDAO.sleep(dataSource, workflowUuid, timeoutFunctionId, timeout, true, this.schema)
178-
.toMillis();
179-
long timeoutMs = (long) (actualTimeout);
189+
if (!checkedDBForSleep) {
190+
// Support OAOO sleep
191+
actualTimeout =
192+
StepsDAO.sleep(
193+
dataSource, workflowUuid, timeoutFunctionId, timeout, true, this.schema)
194+
.toMillis();
195+
checkedDBForSleep = true;
196+
targetTime = nowTime + actualTimeout;
197+
}
198+
if (nowTime >= targetTime) break;
199+
long timeoutMs = (long) (Math.min(targetTime - nowTime, dbPollingIntervalEventMs));
180200
lockPair.condition.await(timeoutMs, TimeUnit.MILLISECONDS);
181201
}
182202
} finally {
@@ -369,34 +389,43 @@ public Object getEvent(
369389
// Check if the key is already in the database. If not, wait for the
370390
// notification.
371391
Object value = null;
372-
373-
// Initial database check
374392
final String sql =
375393
"""
376394
SELECT value FROM %s.workflow_events WHERE workflow_uuid = ? AND key = ?
377395
"""
378396
.formatted(this.schema);
379397

380-
try (Connection conn = dataSource.getConnection();
381-
PreparedStatement stmt = conn.prepareStatement(sql)) {
398+
// Wait for the notification
399+
double actualTimeout = timeout.toMillis();
400+
var targetTime = System.currentTimeMillis() + actualTimeout;
401+
var checkedDBForSleep = false;
402+
var hasExistingNotification = false;
403+
404+
while (true) {
405+
// Database check
382406

383-
stmt.setString(1, targetUuid);
384-
stmt.setString(2, key);
407+
try (Connection conn = dataSource.getConnection();
408+
PreparedStatement stmt = conn.prepareStatement(sql)) {
385409

386-
try (ResultSet rs = stmt.executeQuery()) {
387-
if (rs.next()) {
388-
String serializedValue = rs.getString("value");
389-
Object[] valueArray = JSONUtil.deserializeToArray(serializedValue);
390-
value = valueArray == null ? null : valueArray[0];
410+
stmt.setString(1, targetUuid);
411+
stmt.setString(2, key);
412+
413+
try (ResultSet rs = stmt.executeQuery()) {
414+
if (rs.next()) {
415+
String serializedValue = rs.getString("value");
416+
Object[] valueArray = JSONUtil.deserializeToArray(serializedValue);
417+
value = valueArray == null ? null : valueArray[0];
418+
hasExistingNotification = true;
419+
}
391420
}
392421
}
393-
}
394422

395-
if (value == null) {
396-
// Wait for the notification
397-
double actualTimeout = timeout.toMillis();
398-
if (callerCtx != null) {
399-
// Support OAOO sleep for workflows
423+
if (hasExistingNotification) break;
424+
var nowTime = System.currentTimeMillis();
425+
if (nowTime > targetTime) break;
426+
427+
// Consult DB - part of timeout may have expired if sleep is durable.
428+
if (callerCtx != null & !checkedDBForSleep) {
400429
actualTimeout =
401430
StepsDAO.sleep(
402431
dataSource,
@@ -406,32 +435,20 @@ public Object getEvent(
406435
true, // skip_sleep
407436
this.schema)
408437
.toMillis();
438+
targetTime = System.currentTimeMillis() + actualTimeout;
439+
checkedDBForSleep = true;
440+
if (nowTime > targetTime) break;
409441
}
410442

411443
try {
412-
long timeoutms = (long) (actualTimeout);
444+
long timeoutms = (long) (targetTime - nowTime);
413445
logger.debug("Waiting for notification {}...", timeout);
414-
lockConditionPair.condition.await(timeoutms, TimeUnit.MILLISECONDS);
446+
lockConditionPair.condition.await(
447+
Math.min(timeoutms, dbPollingIntervalEventMs), TimeUnit.MILLISECONDS);
415448
} catch (InterruptedException e) {
416449
Thread.currentThread().interrupt();
417450
throw new RuntimeException("Interrupted while waiting for event", e);
418451
}
419-
420-
// Read the value from the database after notification
421-
try (Connection conn = dataSource.getConnection();
422-
PreparedStatement stmt = conn.prepareStatement(sql)) {
423-
424-
stmt.setString(1, targetUuid);
425-
stmt.setString(2, key);
426-
427-
try (ResultSet rs = stmt.executeQuery()) {
428-
if (rs.next()) {
429-
String serializedValue = rs.getString("value");
430-
Object[] valueArray = JSONUtil.deserializeToArray(serializedValue);
431-
value = valueArray == null ? null : valueArray[0];
432-
}
433-
}
434-
}
435452
}
436453

437454
// Record the output if it's in a workflow

transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -75,18 +75,9 @@ public void stop() {
7575
notificationService.stop();
7676
}
7777

78-
/**
79-
* Get workflow result by workflow ID
80-
*
81-
* @param workflowId The workflow ID
82-
* @return Optional containing the raw output string if workflow completed successfully, empty
83-
* otherwise
84-
*/
85-
public Optional<String> getWorkflowResult(String workflowId) {
86-
return DbRetry.call(
87-
() -> {
88-
return workflowDAO.getWorkflowResult(workflowId);
89-
});
78+
void speedUpPollingForTest() {
79+
workflowDAO.speedUpPollingForTest();
80+
notificationsDAO.speedUpPollingForTest();
9081
}
9182

9283
/**

transact/src/main/java/dev/dbos/transact/database/WorkflowDAO.java

Lines changed: 4 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -28,55 +28,15 @@ public class WorkflowDAO {
2828

2929
private final HikariDataSource dataSource;
3030
private final String schema;
31+
private long getResultPollingIntervalMs = 1000;
3132

3233
WorkflowDAO(HikariDataSource ds, String schema) {
3334
this.dataSource = ds;
3435
this.schema = Objects.requireNonNull(schema);
3536
}
3637

37-
public Optional<String> getWorkflowResult(String workflowId) throws SQLException {
38-
if (dataSource.isClosed()) {
39-
throw new IllegalStateException("Database is closed!");
40-
}
41-
42-
logger.debug("getWorkflowResult {}", workflowId);
43-
44-
String sql =
45-
"""
46-
SELECT status, output, error FROM %s.workflow_status WHERE workflow_uuid = ?;
47-
"""
48-
.formatted(this.schema);
49-
50-
try (Connection connection = dataSource.getConnection();
51-
PreparedStatement stmt = connection.prepareStatement(sql)) {
52-
53-
stmt.setString(1, workflowId);
54-
55-
try (ResultSet rs = stmt.executeQuery()) {
56-
if (rs.next()) {
57-
String status = rs.getString("status");
58-
59-
if (WorkflowState.SUCCESS.toString().equals(status)) {
60-
String output = rs.getString("output");
61-
return Optional.ofNullable(output);
62-
63-
} else if (WorkflowState.ERROR.toString().equals(status)) {
64-
String error = rs.getString("error");
65-
return Optional.ofNullable(error);
66-
}
67-
68-
// For other statuses (PENDING, RUNNING, etc.), return empty
69-
return Optional.empty();
70-
}
71-
72-
// No row found - return empty
73-
return Optional.empty();
74-
}
75-
76-
} catch (SQLException e) {
77-
logger.error("Error getting workflow {} result", workflowId, e);
78-
throw e;
79-
}
38+
void speedUpPollingForTest() {
39+
getResultPollingIntervalMs = 100;
8040
}
8141

8242
public WorkflowInitResult initWorkflowStatus(
@@ -633,7 +593,7 @@ public <T, E extends Exception> T awaitWorkflowResult(String workflowId) throws
633593
}
634594

635595
try {
636-
Thread.sleep(1000);
596+
Thread.sleep(getResultPollingIntervalMs);
637597
} catch (InterruptedException e) {
638598
Thread.currentThread().interrupt();
639599
throw new RuntimeException("Workflow polling interrupted for " + workflowId, e);

transact/src/test/java/dev/dbos/transact/database/DisruptiveServiceImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public String wfPart2(String id1) {
7474
DBUtils.causeChaos(ds);
7575
DBOS.send(id1, "hello1", "topic");
7676
DBUtils.causeChaos(ds);
77-
var v1 = (String) DBOS.getEvent(id1, "key", Duration.ofSeconds(10));
77+
var v1 = (String) DBOS.getEvent(id1, "key", Duration.ofSeconds(5));
7878
DBUtils.causeChaos(ds);
7979
return "Part2" + v1;
8080
}

transact/src/test/java/dev/dbos/transact/database/SystemDatabaseTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import static org.junit.jupiter.api.Assertions.assertTrue;
77

88
import dev.dbos.transact.DBOS;
9+
import dev.dbos.transact.DBOSTestAccess;
910
import dev.dbos.transact.config.DBOSConfig;
1011
import dev.dbos.transact.exceptions.DBOSMaxRecoveryAttemptsExceededException;
1112
import dev.dbos.transact.exceptions.DBOSQueueDuplicatedException;
@@ -126,6 +127,7 @@ public void testSysDbWfDisruption() throws Exception {
126127
var dsvc = DBOS.registerWorkflows(DisruptiveService.class, dsvci, UUID.randomUUID().toString());
127128
dsvci.setSelf(dsvc);
128129
DBOS.launch();
130+
DBOSTestAccess.getSystemDatabase().speedUpPollingForTest();
129131
try {
130132
assertEquals("Hehehe", dsvc.dbLossBetweenSteps());
131133

0 commit comments

Comments
 (0)