Skip to content

Commit 1792345

Browse files
authored
pause scheduler for tests (#337)
Improve test reliability by: * Add pause support for `SchedulerService` tests. * Fixing issue where we were scheduling already scheduled WorkflowSchedules fixes #335
1 parent 2732dbc commit 1792345

6 files changed

Lines changed: 78 additions & 17 deletions

File tree

transact/src/main/java/dev/dbos/transact/execution/QueueService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,11 @@ public void setSpeedupForTest() {
3838
speedup = 0.01;
3939
}
4040

41-
public synchronized void pause() {
41+
public void pause() {
4242
paused.set(true);
4343
}
4444

45-
public synchronized void unpause() {
45+
public void unpause() {
4646
paused.set(false);
4747
}
4848

transact/src/main/java/dev/dbos/transact/execution/SchedulerService.java

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.concurrent.ScheduledFuture;
2323
import java.util.concurrent.ThreadLocalRandom;
2424
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.atomic.AtomicBoolean;
2526
import java.util.concurrent.atomic.AtomicReference;
2627
import java.util.stream.Collectors;
2728

@@ -67,6 +68,7 @@ public static void validateAnnotatedWorkflowSchedule(RegisteredWorkflow workflow
6768
private final SystemDatabase systemDatabase;
6869
private final Duration pollingInterval;
6970
private final AtomicReference<ScheduledExecutorService> execServiceRef = new AtomicReference<>();
71+
private final AtomicBoolean paused = new AtomicBoolean(false);
7072
private final ConcurrentHashMap<String, ScheduledFuture<?>> workflowScheduleFutures =
7173
new ConcurrentHashMap<>();
7274

@@ -98,6 +100,14 @@ public void close() {
98100
}
99101
}
100102

103+
public void pause() {
104+
paused.set(true);
105+
}
106+
107+
public void unpause() {
108+
paused.set(false);
109+
}
110+
101111
private void pollWorkflowSchedules() {
102112
try {
103113
pollWorkflowSchedulesImpl();
@@ -124,20 +134,24 @@ private void pollWorkflowSchedulesImpl() {
124134
}
125135

126136
// shut down any scheduled future that isn't in the list of current schedules
127-
var scheduleIds = schedules.stream().map(s -> s.id()).collect(Collectors.toSet());
137+
var currentIds = schedules.stream().map(WorkflowSchedule::id).collect(Collectors.toSet());
128138
for (var key : workflowScheduleFutures.keySet()) {
129-
if (!scheduleIds.contains(key)) {
139+
if (!currentIds.contains(key)) {
130140
cancelWorkflowSchedule(key);
131141
}
132142
}
133143

134144
for (var schedule : schedules) {
135-
145+
var scheduleRunning = workflowScheduleFutures.containsKey(schedule.id());
136146
if (!schedule.isActive()) {
137-
if (workflowScheduleFutures.containsKey(schedule.id())) {
147+
// if the schedule is no longer active but we still have a scheduled future for it, cancel
148+
// it
149+
if (scheduleRunning) {
138150
cancelWorkflowSchedule(schedule.id());
139151
}
140-
} else {
152+
} else if (!scheduleRunning) {
153+
// if the schedule is active but we don't yet have a scheduled future for it, schedule it
154+
// now
141155
var optRegWf = dbosExecutor.getWorkflow(schedule.workflowName(), schedule.className());
142156
if (optRegWf.isEmpty()) {
143157
logger.error(
@@ -199,12 +213,17 @@ public void schedule() {
199213
.ifPresent(
200214
cronTime -> {
201215
this.nextTime = cronTime.truncatedTo(ChronoUnit.SECONDS);
202-
// prevFuture should be null or a scheduled task that already fired.
203-
// but we still cancel it just to be sure
204216
var prevFuture =
205217
workflowScheduleFutures.put(
206218
wfSchedule.id(), scheduleTask(this.nextTime, this));
219+
// prevFuture should be null or a scheduled task that already fired.
220+
// cancel it anyway just to be sure
207221
if (prevFuture != null) {
222+
if (!prevFuture.isDone()) {
223+
logger.debug(
224+
"Previous scheduled task for {} has not yet completed",
225+
wfSchedule.scheduleName());
226+
}
208227
prevFuture.cancel(false);
209228
}
210229
});
@@ -219,6 +238,13 @@ public void run() {
219238
}
220239

221240
try {
241+
if (paused.get()) {
242+
logger.debug(
243+
"Skipping scheduled workflow {} schedule {} because scheduler is paused",
244+
regWorkflow.fullyQualifiedName(),
245+
wfSchedule.scheduleName());
246+
return;
247+
}
222248
var args = new Object[] {nextTime.toInstant(), wfSchedule.context()};
223249
var workflowId = "sched-%s-%s".formatted(wfSchedule.scheduleName(), nextTime);
224250
logger.debug(
@@ -284,6 +310,13 @@ public void run() {
284310

285311
var scheduledTime = nextTime;
286312
try {
313+
if (paused.get()) {
314+
logger.debug(
315+
"Skipping annotated workflow {} schedule {} because scheduler is paused",
316+
workflowName,
317+
swf.cron());
318+
return;
319+
}
287320
var args = new Object[] {scheduledTime.toInstant(), Instant.now()};
288321
var workflowId = "sched-%s-%s".formatted(workflowName, scheduledTime);
289322
logger.debug(

transact/src/test/java/dev/dbos/transact/client/ClientScheduleTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import static org.junit.jupiter.api.Assertions.*;
44

55
import dev.dbos.transact.DBOS;
6+
import dev.dbos.transact.DBOSTestAccess;
67
import dev.dbos.transact.config.DBOSConfig;
78
import dev.dbos.transact.utils.PgContainer;
89
import dev.dbos.transact.workflow.ScheduleStatus;
@@ -218,6 +219,9 @@ public void clientTriggerScheduleNotFound() {
218219

219220
@Test
220221
public void clientBackfillSchedule() throws Exception {
222+
// Pause scheduler to prevent interference with backfill results
223+
DBOSTestAccess.getSchedulerService(dbos).pause();
224+
221225
try (var client = pgContainer.dbosClient()) {
222226
// Use a cron that won't fire during the test (every minute)
223227
client.createSchedule(
@@ -241,6 +245,9 @@ public void clientBackfillSchedule() throws Exception {
241245

242246
@Test
243247
public void clientBackfillScheduleEmptyWindow() {
248+
// Pause scheduler to prevent interference with backfill results
249+
DBOSTestAccess.getSchedulerService(dbos).pause();
250+
244251
try (var client = pgContainer.dbosClient()) {
245252
client.createSchedule(
246253
"backfill-empty", workflowName(), className(), "0/1 * * * * *", null, false, null, null);

transact/src/test/java/dev/dbos/transact/scheduled/ScheduledWorkflowService.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@
55
import java.time.Instant;
66
import java.util.List;
77
import java.util.concurrent.CopyOnWriteArrayList;
8+
import java.util.concurrent.CountDownLatch;
89

910
interface ScheduledWorkflowService {
1011
void scheduledRun(Instant scheduled, Object context);
12+
13+
void latchedRun(Instant scheduled, Object context);
1114
}
1215

1316
class ScheduledWorkflowImpl implements ScheduledWorkflowService {
@@ -16,6 +19,7 @@ class ScheduledWorkflowImpl implements ScheduledWorkflowService {
1619
volatile Instant lastScheduled = null;
1720
volatile Object lastContext = null;
1821
final List<Instant> allScheduledTimes = new CopyOnWriteArrayList<>();
22+
final CountDownLatch latch = new CountDownLatch(3);
1923

2024
@Override
2125
@Workflow
@@ -26,6 +30,12 @@ public void scheduledRun(Instant scheduled, Object context) {
2630
allScheduledTimes.add(scheduled);
2731
}
2832

33+
@Override
34+
@Workflow
35+
public void latchedRun(Instant scheduled, Object context) {
36+
latch.countDown();
37+
}
38+
2939
void reset() {
3040
counter = 0;
3141
lastScheduled = null;

transact/src/test/java/dev/dbos/transact/scheduled/WorkflowScheduleTest.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import static org.junit.jupiter.api.Assertions.*;
44

55
import dev.dbos.transact.DBOS;
6+
import dev.dbos.transact.DBOSTestAccess;
67
import dev.dbos.transact.config.DBOSConfig;
78
import dev.dbos.transact.utils.PgContainer;
89
import dev.dbos.transact.workflow.ScheduleStatus;
@@ -13,6 +14,7 @@
1314
import java.time.ZoneId;
1415
import java.time.temporal.ChronoUnit;
1516
import java.util.List;
17+
import java.util.concurrent.TimeUnit;
1618

1719
import com.zaxxer.hikari.HikariDataSource;
1820
import org.junit.jupiter.api.AutoClose;
@@ -401,6 +403,9 @@ public void backfillScheduleNotFound() {
401403
public void backfillScheduleCorrectTimes() throws Exception {
402404
var impl = registerAndLaunch();
403405

406+
// Pause scheduler to prevent interference with backfill results
407+
DBOSTestAccess.getSchedulerService(dbos).pause();
408+
404409
// Every minute at second 0: "0 * * * * *" (6-field cron)
405410
dbos.createSchedule(
406411
"backfill-correct", workflowName(), className(), "0 * * * * *", null, false, null, null);
@@ -430,6 +435,9 @@ public void backfillScheduleCorrectTimes() throws Exception {
430435
public void backfillScheduleHourly() throws Exception {
431436
var impl = registerAndLaunch();
432437

438+
// Pause scheduler to prevent interference with backfill results
439+
DBOSTestAccess.getSchedulerService(dbos).pause();
440+
433441
// Every hour at minute 0: "0 0 * * * *" (6-field cron, runs at top of each hour)
434442
dbos.createSchedule(
435443
"backfill-hourly", workflowName(), className(), "0 0 * * * *", null, false, null, null);
@@ -461,6 +469,9 @@ public void backfillScheduleHourly() throws Exception {
461469
public void backfillScheduleDaily() throws Exception {
462470
var impl = registerAndLaunch();
463471

472+
// Pause scheduler to prevent interference with backfill results
473+
DBOSTestAccess.getSchedulerService(dbos).pause();
474+
464475
// Every day at midnight: "0 0 0 * * *" (6-field cron)
465476
dbos.createSchedule(
466477
"backfill-daily", workflowName(), className(), "0 0 0 * * *", null, false, null, null);
@@ -535,18 +546,18 @@ public void scheduleRunsAfterPolling() throws Exception {
535546
var impl = registerAndLaunch();
536547

537548
dbos.createSchedule(
538-
"run-sched", workflowName(), className(), "0/1 * * * * *", null, false, null, null);
549+
"run-sched", "latchedRun", className(), "0/1 * * * * *", null, false, null, null);
539550

540551
// Verify schedule was created and is active
541552
var schedule = dbos.getSchedule("run-sched");
542553
assertTrue(schedule.isPresent(), "Schedule should be created");
543554
assertEquals(ScheduleStatus.ACTIVE, schedule.get().status());
544555

545-
// Allow time for scheduler polls + workflow executions
546-
// Schedule triggers at second boundary, so wait long enough for multiple executions
547-
Thread.sleep(8000);
548-
549-
assertTrue(impl.counter >= 2, "Expected at least 2 executions, got " + impl.counter);
550-
assertTrue(impl.counter <= 10, "Expected at most 10 executions, got " + impl.counter);
556+
// latch initialized with 3 count, with each execution counting down once.
557+
// Wait for all 3 counts to be released, which indicates the workflow ran at least 3 times
558+
// (scheduler should run it every second).
559+
assertTrue(
560+
impl.latch.await(5, TimeUnit.SECONDS),
561+
"Expected latch to count down to zero within 5 seconds");
551562
}
552563
}

transact/src/test/java/dev/dbos/transact/utils/PgContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public PgContainer() {
7474

7575
@Override
7676
public void close() throws Exception {
77-
// drop a database we created and return the container too the pool
77+
// drop the database we created and return the container too the pool
7878
var _jdbcUrl = pgContainer.getJdbcUrl();
7979
try (var conn = DriverManager.getConnection(_jdbcUrl, username(), password());
8080
var stmt = conn.createStatement()) {

0 commit comments

Comments
 (0)