Skip to content

Commit d6c5428

Browse files
committed
Fix recurring task double-enqueue caused by wall-clock race condition
When a recurring task's ScheduledTask fires, it computes the next occurrence by calling `task.next_time`, which uses `Time.current` as the reference point. If the timer fires slightly early (e.g. 2ms before the minute boundary), or if there's any sub-second timing drift, `next_time` can return the *current* slot instead of the next one — causing a duplicate enqueue and a near-immediate re-fire. Example with "every minute" schedule: ScheduledTask fires at 11:59:59.998 1. Enqueues job for 12:00:00 ✓ 2. Calls next_time → Time.current is 11:59:59.998 → fugit returns 12:00:00 (the slot we just enqueued) → Schedules new ScheduledTask with ~2ms delay 3. New task fires, enqueues duplicate job for 12:00:00 ✗ Fix: compute the next occurrence from the known run_at of the slot that just fired, rather than from wall clock time. `next_time_after(run_at)` passes the time to fugit's `next_time(from)` parameter, so `next_time_after(12:00:00)` always returns 12:01:00 regardless of what the wall clock says.
1 parent 50defe2 commit d6c5428

4 files changed

Lines changed: 45 additions & 5 deletions

File tree

app/models/solid_queue/recurring_task.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ def delay_from_now
5151
[ (next_time - Time.current).to_f, 0.1 ].max
5252
end
5353

54+
def next_time_after(time)
55+
parsed_schedule.next_time(time).utc
56+
end
57+
5458
def next_time
5559
parsed_schedule.next_time.utc
5660
end

lib/solid_queue/scheduler/recurring_schedule.rb

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ def schedule_tasks
2626
end
2727
end
2828

29-
def schedule_task(task)
30-
scheduled_tasks[task.key] = schedule(task)
29+
def schedule_task(task, run_at: task.next_time)
30+
scheduled_tasks[task.key] = schedule(task, run_at: run_at)
3131
end
3232

3333
def unschedule_tasks
@@ -49,9 +49,11 @@ def reload_tasks
4949
@configured_tasks = SolidQueue::RecurringTask.where(key: task_keys).to_a
5050
end
5151

52-
def schedule(task)
53-
scheduled_task = Concurrent::ScheduledTask.new(task.delay_from_now, args: [ self, task, task.next_time ]) do |thread_schedule, thread_task, thread_task_run_at|
54-
thread_schedule.schedule_task(thread_task)
52+
def schedule(task, run_at: task.next_time)
53+
delay = [ (run_at - Time.current).to_f, 0.1 ].max
54+
55+
scheduled_task = Concurrent::ScheduledTask.new(delay, args: [ self, task, run_at ]) do |thread_schedule, thread_task, thread_task_run_at|
56+
thread_schedule.schedule_task(thread_task, run_at: thread_task.next_time_after(thread_task_run_at))
5557

5658
wrap_in_app_executor do
5759
thread_task.enqueue(at: thread_task_run_at)

test/models/solid_queue/recurring_task_test.rb

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,18 @@ def perform
203203
assert_equal 4, job.priority
204204
end
205205

206+
test "next_time_after returns the next occurrence after the given time" do
207+
task = recurring_task_with(class_name: "JobWithoutArguments", schedule: "every minute")
208+
209+
# next_time_after a time exactly on the minute boundary should return the following minute
210+
time = Time.utc(2026, 3, 12, 1, 28, 0)
211+
assert_equal Time.utc(2026, 3, 12, 1, 29, 0), task.next_time_after(time)
212+
213+
# next_time_after a time just before the boundary should return that boundary
214+
time = Time.utc(2026, 3, 12, 1, 27, 59)
215+
assert_equal Time.utc(2026, 3, 12, 1, 28, 0), task.next_time_after(time)
216+
end
217+
206218
test "task configured with a command" do
207219
task = recurring_task_with(command: "JobBuffer.add('from_a_command')")
208220
enqueue_and_assert_performed_with_result(task, "from_a_command")

test/unit/scheduler_test.rb

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,28 @@ class SchedulerTest < ActiveSupport::TestCase
1717
scheduler.stop
1818
end
1919

20+
test "single scheduler does not double-enqueue recurring tasks" do
21+
recurring_tasks = { example_task: { class: "AddToBufferJob", schedule: "every second", args: 42 } }
22+
scheduler = SolidQueue::Scheduler.new(recurring_tasks: recurring_tasks)
23+
scheduler.start
24+
25+
wait_while_with_timeout(4.seconds) { SolidQueue::RecurringExecution.count < 3 }
26+
scheduler.stop
27+
28+
skip_active_record_query_cache do
29+
# Each run_at should appear exactly once — no duplicates
30+
run_at_counts = SolidQueue::RecurringExecution.group(:run_at).count
31+
duplicates = run_at_counts.select { |_, count| count > 1 }
32+
assert_empty duplicates, "Expected no duplicate run_at values, but found: #{duplicates.inspect}"
33+
34+
# Number of jobs should match number of recurring executions (no extra jobs)
35+
assert_equal SolidQueue::Job.count, SolidQueue::RecurringExecution.count,
36+
"Expected one job per recurring execution, got #{SolidQueue::Job.count} jobs for #{SolidQueue::RecurringExecution.count} executions"
37+
end
38+
ensure
39+
scheduler&.stop
40+
end
41+
2042
test "run more than one instance of the scheduler with recurring tasks" do
2143
recurring_tasks = { example_task: { class: "AddToBufferJob", schedule: "every second", args: 42 } }
2244
schedulers = 2.times.collect do

0 commit comments

Comments
 (0)