Skip to content

Commit 934d496

Browse files
authored
Fix concurrent applySchedules invocation (#450)
Port of dbos-inc/dbos-transact-py#741 to Java. # Problem SchedulesDAO.applySchedules implemented "create or replace" as DELETE followed by INSERT (with a freshly generated schedule_id every time). Two concurrent applySchedules calls for the same schedule name could interleave their delete/insert pairs and collide on the schedule_name unique constraint. The new schedule_id on every apply also meant a schedule's identity churned on every edit. # Fix - SchedulesDAO: replaced delete+insert with a single INSERT ... ON CONFLICT (schedule_name) DO UPDATE, making applySchedules idempotent and race-free. On conflict, schedule_id, status, and last_fired_at are preserved from the existing row; only the definition fields (workflow name/class, cron, context, backfill flag, timezone, queue) are updated. - SchedulerService: since schedule_id no longer changes on re-apply, the poller can no longer rely on a new ID to detect an edited schedule. Added RunningSchedule, a record snapshotting each running schedule's definition fields (excluding identity/status/runtime state) plus its current ScheduledFuture, keyed by schedule id in a single map (runningSchedules). Each poll compares the live schedule against the snapshot via RunningSchedule.matches(...); on a mismatch, the schedule's future is cancelled and restarted with the new definition (no backfill on restart — only on first start).
1 parent b036914 commit 934d496

6 files changed

Lines changed: 474 additions & 157 deletions

File tree

transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import dev.dbos.transact.database.dao.ExternalStateDAO;
77
import dev.dbos.transact.database.dao.NotificationsDAO;
88
import dev.dbos.transact.database.dao.QueuesDAO;
9+
import dev.dbos.transact.database.dao.ScheduleRecord;
910
import dev.dbos.transact.database.dao.SchedulesDAO;
1011
import dev.dbos.transact.database.dao.StepsDAO;
1112
import dev.dbos.transact.database.dao.StreamsDAO;
@@ -612,6 +613,19 @@ public List<WorkflowSchedule> listSchedules(
612613
() -> SchedulesDAO.listSchedules(ctx, statuses, workflowNames, scheduleNamePrefixes));
613614
}
614615

616+
// Raw form of listSchedules used by the scheduler's poller; see ScheduleRecord for why.
617+
public List<ScheduleRecord> listScheduleRecords(
618+
List<ScheduleStatus> statuses,
619+
List<String> workflowNames,
620+
List<String> scheduleNamePrefixes) {
621+
return dbRetry(
622+
() -> SchedulesDAO.listScheduleRecords(ctx, statuses, workflowNames, scheduleNamePrefixes));
623+
}
624+
625+
public @Nullable DBOSSerializer serializer() {
626+
return ctx.serializer();
627+
}
628+
615629
public void pauseSchedule(String name) {
616630
dbRetry(() -> SchedulesDAO.pauseSchedule(ctx, name));
617631
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package dev.dbos.transact.database.dao;
2+
3+
import dev.dbos.transact.json.DBOSSerializer;
4+
import dev.dbos.transact.json.SerializationUtil;
5+
import dev.dbos.transact.workflow.ScheduleStatus;
6+
import dev.dbos.transact.workflow.WorkflowSchedule;
7+
8+
import java.time.Instant;
9+
import java.time.ZoneId;
10+
11+
import org.jspecify.annotations.NonNull;
12+
import org.jspecify.annotations.Nullable;
13+
14+
// Raw form of a schedule row as read from the database: context is kept as its serialized string
15+
// rather than deserialized. This lets callers that only need to detect definition changes (the
16+
// scheduler's poller) compare schedules without deserializing, and without depending on the
17+
// equals() of whatever type the deserialized context happens to be. Callers that need the actual
18+
// context object convert via toWorkflowSchedule().
19+
public record ScheduleRecord(
20+
@Nullable String id,
21+
@NonNull String scheduleName,
22+
@NonNull String workflowName,
23+
@NonNull String className,
24+
@NonNull String cron,
25+
@NonNull ScheduleStatus status,
26+
@Nullable String context,
27+
@Nullable Instant lastFiredAt,
28+
boolean automaticBackfill,
29+
@Nullable ZoneId cronTimezone,
30+
@Nullable String queueName) {
31+
32+
public boolean isActive() {
33+
return status == ScheduleStatus.ACTIVE;
34+
}
35+
36+
public WorkflowSchedule toWorkflowSchedule(@Nullable DBOSSerializer serializer) {
37+
Object deserializedContext =
38+
SerializationUtil.deserializeValue(
39+
context, serializer != null ? serializer.name() : null, serializer);
40+
return new WorkflowSchedule(
41+
id,
42+
scheduleName,
43+
workflowName,
44+
className,
45+
cron,
46+
status,
47+
deserializedContext,
48+
lastFiredAt,
49+
automaticBackfill,
50+
cronTimezone,
51+
queueName);
52+
}
53+
}

transact/src/main/java/dev/dbos/transact/database/dao/SchedulesDAO.java

Lines changed: 76 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ static void createSchedule(
4242
// language
4343
Objects.requireNonNull(schedule.status(), "status must not be null");
4444
Objects.requireNonNull(schedule.cron(), "cron must not be null");
45-
SchedulerService.CRON_PARSER.parse(schedule.cron());
45+
SchedulerService.CRON_PARSER.parse(schedule.cron()).validate();
4646

4747
String sql =
4848
"""
@@ -87,8 +87,21 @@ public static List<WorkflowSchedule> listSchedules(
8787
List<String> workflowNames,
8888
List<String> scheduleNamePrefixes)
8989
throws SQLException {
90+
return listScheduleRecords(ctx, statuses, workflowNames, scheduleNamePrefixes).stream()
91+
.map(r -> r.toWorkflowSchedule(ctx.serializer()))
92+
.toList();
93+
}
94+
95+
// Raw form of listSchedules: keeps context as its serialized string instead of deserializing it.
96+
// Used by the scheduler's poller to detect definition changes without relying on the equals() of
97+
// whatever type the deserialized context happens to be.
98+
public static List<ScheduleRecord> listScheduleRecords(
99+
DbContext ctx,
100+
List<ScheduleStatus> statuses,
101+
List<String> workflowNames,
102+
List<String> scheduleNamePrefixes)
103+
throws SQLException {
90104

91-
DBOSSerializer serializer = ctx.serializer();
92105
StringBuilder sql =
93106
new StringBuilder(
94107
"""
@@ -135,9 +148,9 @@ public static List<WorkflowSchedule> listSchedules(
135148
}
136149
}
137150
try (ResultSet rs = ps.executeQuery()) {
138-
List<WorkflowSchedule> results = new ArrayList<>();
151+
List<ScheduleRecord> results = new ArrayList<>();
139152
while (rs.next()) {
140-
results.add(rowToSchedule(rs, serializer));
153+
results.add(rowToScheduleRecord(rs));
141154
}
142155
return results;
143156
}
@@ -151,7 +164,12 @@ public static List<WorkflowSchedule> listSchedules(
151164

152165
public static Optional<WorkflowSchedule> getSchedule(DbContext ctx, String name)
153166
throws SQLException {
154-
DBOSSerializer serializer = ctx.serializer();
167+
return getScheduleRecord(ctx, name).map(r -> r.toWorkflowSchedule(ctx.serializer()));
168+
}
169+
170+
// Raw form of getSchedule: keeps context as its serialized string instead of deserializing it.
171+
public static Optional<ScheduleRecord> getScheduleRecord(DbContext ctx, String name)
172+
throws SQLException {
155173
String sql =
156174
"""
157175
SELECT schedule_id, schedule_name, workflow_name, workflow_class_name,
@@ -167,7 +185,7 @@ public static Optional<WorkflowSchedule> getSchedule(DbContext ctx, String name)
167185
ps.setString(1, name);
168186
try (ResultSet rs = ps.executeQuery()) {
169187
if (rs.next()) {
170-
return Optional.of(rowToSchedule(rs, serializer));
188+
return Optional.of(rowToScheduleRecord(rs));
171189
}
172190
return Optional.empty();
173191
}
@@ -239,8 +257,7 @@ public static void applySchedules(DbContext ctx, List<WorkflowSchedule> schedule
239257
conn.setAutoCommit(false);
240258
try {
241259
for (WorkflowSchedule schedule : schedules) {
242-
deleteSchedule(conn, ctx.schema(), schedule.scheduleName());
243-
createSchedule(
260+
upsertSchedule(
244261
conn,
245262
ctx.schema(),
246263
ctx.serializer(),
@@ -257,22 +274,67 @@ public static void applySchedules(DbContext ctx, List<WorkflowSchedule> schedule
257274
}
258275
}
259276

260-
private static WorkflowSchedule rowToSchedule(ResultSet rs, DBOSSerializer serializer)
277+
// Idempotent upsert by schedule_name, so concurrent applySchedules calls for the same name
278+
// can't race each other into a duplicate-key error. On conflict, schedule_id, status, and
279+
// last_fired_at are preserved from the existing row; the poller detects the changed
280+
// definition and restarts the schedule's future.
281+
private static void upsertSchedule(
282+
Connection conn, String schema, DBOSSerializer serializer, WorkflowSchedule schedule)
261283
throws SQLException {
262-
Object context =
263-
SerializationUtil.deserializeValue(
264-
rs.getString(7), serializer != null ? serializer.name() : null, serializer);
284+
285+
SchedulerService.CRON_PARSER.parse(schedule.cron()).validate();
286+
287+
String sql =
288+
"""
289+
INSERT INTO "%s".workflow_schedules
290+
(schedule_id, schedule_name, workflow_name, workflow_class_name,
291+
schedule, status, context, last_fired_at, automatic_backfill,
292+
cron_timezone, queue_name)
293+
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
294+
ON CONFLICT (schedule_name) DO UPDATE SET
295+
workflow_name = EXCLUDED.workflow_name,
296+
workflow_class_name = EXCLUDED.workflow_class_name,
297+
schedule = EXCLUDED.schedule,
298+
context = EXCLUDED.context,
299+
automatic_backfill = EXCLUDED.automatic_backfill,
300+
cron_timezone = EXCLUDED.cron_timezone,
301+
queue_name = EXCLUDED.queue_name
302+
"""
303+
.formatted(schema);
304+
305+
var serializedContext =
306+
SerializationUtil.serializeValue(
307+
schedule.context(), serializer != null ? serializer.name() : null, serializer);
308+
309+
var timeZone = schedule.cronTimezone() == null ? null : schedule.cronTimezone().getId();
310+
try (PreparedStatement ps = conn.prepareStatement(sql)) {
311+
ps.setString(1, schedule.id() != null ? schedule.id() : UUID.randomUUID().toString());
312+
ps.setString(2, schedule.scheduleName());
313+
ps.setString(3, schedule.workflowName());
314+
ps.setString(4, schedule.className());
315+
ps.setString(5, schedule.cron());
316+
ps.setString(6, schedule.status().name());
317+
ps.setString(7, serializedContext.serializedValue());
318+
ps.setString(8, schedule.lastFiredAt() != null ? schedule.lastFiredAt().toString() : null);
319+
ps.setBoolean(9, schedule.automaticBackfill());
320+
ps.setString(10, timeZone);
321+
ps.setString(11, schedule.queueName());
322+
ps.executeUpdate();
323+
}
324+
}
325+
326+
private static ScheduleRecord rowToScheduleRecord(ResultSet rs) throws SQLException {
265327
String lastFiredAtStr = rs.getString(8);
266328
String timeZoneStr = rs.getString(10);
267329

268-
return new WorkflowSchedule(
330+
return new ScheduleRecord(
269331
rs.getString(1),
270332
rs.getString(2),
271333
rs.getString(3),
272334
rs.getString(4),
273335
rs.getString(5),
274336
ScheduleStatus.valueOf(rs.getString(6)),
275-
context,
337+
rs.getString(7),
276338
lastFiredAtStr != null ? Instant.parse(lastFiredAtStr) : null,
277339
rs.getBoolean(9),
278340
timeZoneStr != null ? ZoneId.of(timeZoneStr) : null,

0 commit comments

Comments
 (0)