Skip to content

Commit c9f052c

Browse files
authored
Use seconds (and backoff) (#222)
1 parent a20d745 commit c9f052c

2 files changed

Lines changed: 16 additions & 8 deletions

File tree

transact/src/main/java/dev/dbos/transact/execution/QueueService.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,29 +25,34 @@ public class QueueService {
2525
private volatile boolean paused = false;
2626
private Thread workerThread;
2727
private CountDownLatch shutdownLatch;
28+
private double speedup = 1.0;
2829

2930
public QueueService(DBOSExecutor dbosExecutor, SystemDatabase systemDatabase) {
3031
this.systemDatabase = systemDatabase;
3132
this.dbosExecutor = dbosExecutor;
3233
}
3334

35+
public void setSpeedupForTest() {
36+
speedup = 0.01;
37+
}
38+
3439
@SuppressWarnings("deprecation") // Thread.currentThread().getId()
3540
private void pollForWorkflows() {
3641
logger.debug("PollQueuesThread started {}", Thread.currentThread().getId());
3742

38-
double pollingInterval = 1.0;
39-
double minPollingInterval = 1.0;
40-
double maxPollingInterval = 120.0;
41-
int randomSleep = 0;
43+
double pollingIntervalSec = 1.0;
44+
double minPollingIntervalSec = 1.0;
45+
double maxPollingIntervalSec = 120.0;
46+
double randomSleepFactor = 0;
4247

4348
try {
4449

4550
while (running) {
4651

47-
randomSleep = (int) (0.95 + ThreadLocalRandom.current().nextDouble(0.1));
52+
randomSleepFactor = (0.95 + ThreadLocalRandom.current().nextDouble(0.1));
4853

4954
try {
50-
Thread.sleep(randomSleep);
55+
Thread.sleep((long) (randomSleepFactor * pollingIntervalSec * 1000 * speedup));
5156
} catch (InterruptedException e) {
5257
Thread.currentThread().interrupt();
5358
logger.error("QueuesPollThread interrupted while sleeping");
@@ -76,12 +81,12 @@ private void pollForWorkflows() {
7681

7782
} catch (Exception e) {
7883

79-
pollingInterval = min(maxPollingInterval, pollingInterval * 2);
84+
pollingIntervalSec = min(maxPollingIntervalSec, pollingIntervalSec * 2);
8085
logger.error("Error executing queued workflow", e);
8186
}
8287
}
8388

84-
pollingInterval = max(minPollingInterval, pollingInterval * 0.9);
89+
pollingIntervalSec = max(minPollingIntervalSec, pollingIntervalSec * 0.9);
8590
}
8691

8792
} finally {

transact/src/test/java/dev/dbos/transact/queue/QueuesTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,9 @@ public void testLimiter() throws Exception {
292292
ServiceQ serviceQ = DBOS.registerWorkflows(ServiceQ.class, new ServiceQImpl());
293293

294294
DBOS.launch();
295+
var queueService = DBOSTestAccess.getQueueService();
296+
queueService.setSpeedupForTest();
297+
Thread.sleep(1000);
295298

296299
int numWaves = 3;
297300
int numTasks = numWaves * limit;

0 commit comments

Comments
 (0)