Skip to content

Commit 05b82bb

Browse files
committed
Introduce Async::Loop helper.
1 parent 941d78d commit 05b82bb

4 files changed

Lines changed: 319 additions & 0 deletions

File tree

lib/async.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
require_relative "async/version"
88
require_relative "async/reactor"
9+
require_relative "async/loop"
910

1011
require_relative "kernel/async"
1112
require_relative "kernel/sync"

lib/async/loop.rb

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2026, by Samuel Williams.
5+
6+
require "console"
7+
8+
module Async
9+
# @namespace
10+
module Loop
11+
# Execute a block repeatedly at quantized (time-aligned) intervals.
12+
#
13+
# The alignment is computed modulo the current clock time in seconds. For example, with
14+
# `interval: 60`, executions will occur at 00:00, 01:00, 02:00, etc., regardless of when
15+
# the loop is started. With `interval: 300` (5 minutes), executions align to 00:00, 00:05,
16+
# 00:10, etc.
17+
#
18+
# This is particularly useful for tasks that should run at predictable wall-clock times,
19+
# such as metrics collection, periodic cleanup, or scheduled jobs that need to align
20+
# across multiple processes.
21+
#
22+
# If an error occurs during block execution, it is logged and the loop continues.
23+
#
24+
# @example Run every minute at :00 seconds:
25+
# Async::Loop.quantized(interval: 60) do
26+
# puts "Current time: #{Time.now}"
27+
# end
28+
#
29+
# @example Run every 5 minutes aligned to the hour:
30+
# Async::Loop.quantized(interval: 300) do
31+
# collect_metrics
32+
# end
33+
#
34+
# @parameter interval [Numeric] The interval in seconds. Executions will align to multiples of this interval based on the current time.
35+
# @yields The block to execute at each interval.
36+
#
37+
# @public Since *Async v2.37*.
38+
def self.quantized(interval: 60, &block)
39+
while true
40+
# Compute the wait time to the next interval:
41+
wait = interval - (Time.now.to_f % interval)
42+
if wait.positive?
43+
# Sleep until the next interval boundary:
44+
sleep(wait)
45+
end
46+
47+
begin
48+
yield
49+
rescue => error
50+
Console.error(self, "Loop error:", error)
51+
end
52+
end
53+
end
54+
55+
# Execute a block repeatedly with a fixed delay between executions.
56+
#
57+
# Unlike {quantized}, this method waits for the specified interval *after* each execution
58+
# completes. This means the actual time between the start of successive executions will be
59+
# `interval + execution_time`.
60+
#
61+
# If an error occurs during block execution, it is logged and the loop continues.
62+
#
63+
# @example Run every 5 seconds (plus execution time):
64+
# Async::Loop.periodic(interval: 5) do
65+
# process_queue
66+
# end
67+
#
68+
# @parameter interval [Numeric] The delay in seconds between executions.
69+
# @yields The block to execute periodically.
70+
#
71+
# @public Since *Async v2.37*.
72+
def self.periodic(interval: 60, &block)
73+
while true
74+
begin
75+
yield
76+
rescue => error
77+
Console.error(self, "Loop error:", error)
78+
end
79+
80+
sleep(interval)
81+
end
82+
end
83+
end
84+
end

releases.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Releases
22

3+
## Unreleased
4+
5+
- Introduce `Async::Loop` for robust, time-aligned loops.
6+
37
## v2.36.0
48

59
- Introduce `Task#wait_all` which recursively waits for all children and self, excepting the current task.

test/async/loop.rb

Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2026, by Samuel Williams.
5+
6+
require "async/loop"
7+
require "sus/fixtures/console"
8+
9+
describe Async::Loop do
10+
include Sus::Fixtures::Console::CapturedLogger
11+
12+
with ".quantized" do
13+
it "invokes the block at aligned intervals" do
14+
iterations = 0
15+
thread = Thread.new do
16+
Async::Loop.quantized(interval: 0.1) do
17+
iterations += 1
18+
end
19+
end
20+
21+
sleep(0.35)
22+
expect(iterations).to be >= 2
23+
ensure
24+
thread.kill
25+
thread.join
26+
end
27+
28+
it "uses the given interval" do
29+
iterations = 0
30+
interval = 0.05
31+
thread = Thread.new do
32+
Async::Loop.quantized(interval: interval) do
33+
iterations += 1
34+
end
35+
end
36+
37+
sleep(0.2)
38+
expect(iterations).to be >= 2
39+
ensure
40+
thread.kill
41+
thread.join
42+
end
43+
44+
it "continues after an error and logs it" do
45+
iterations = 0
46+
thread = Thread.new do
47+
Async::Loop.quantized(interval: 0.05) do
48+
iterations += 1
49+
raise "test error" if iterations == 1
50+
end
51+
end
52+
53+
# Allow first iteration (raises), then at least one more (succeeds)
54+
sleep(0.2)
55+
expect(iterations).to be >= 2
56+
expect_console.to have_logged(
57+
severity: be == :error,
58+
subject: be_equal(Async::Loop),
59+
message: be =~ /Loop error:/
60+
)
61+
ensure
62+
thread.kill
63+
thread.join
64+
end
65+
66+
it "aligns executions to interval boundaries" do
67+
execution_times = []
68+
interval = 0.1
69+
70+
thread = Thread.new do
71+
Async::Loop.quantized(interval: interval) do
72+
execution_times << Time.now.to_f
73+
end
74+
end
75+
76+
sleep(0.35)
77+
ensure
78+
thread.kill
79+
thread.join
80+
81+
# Check that executions are roughly aligned to 0.1 second boundaries
82+
# Each execution time modulo interval should be close to 0
83+
if execution_times.size >= 2
84+
alignment_errors = execution_times.map{|t| t % interval}
85+
max_error = alignment_errors.max
86+
87+
# Allow some tolerance for scheduling delays
88+
expect(max_error).to be < 0.02
89+
end
90+
end
91+
92+
it "starts at the next interval boundary" do
93+
start_time = Time.now.to_f
94+
first_execution_time = nil
95+
interval = 0.1
96+
97+
thread = Thread.new do
98+
Async::Loop.quantized(interval: interval) do
99+
first_execution_time ||= Time.now.to_f
100+
end
101+
end
102+
103+
# Wait for first execution to complete
104+
sleep(0.15)
105+
ensure
106+
thread.kill
107+
thread.join
108+
109+
# The first execution should occur at the next interval boundary
110+
elapsed = first_execution_time - start_time
111+
expected_wait = interval - (start_time % interval)
112+
113+
# Allow some tolerance for scheduling
114+
expect(elapsed).to be_within(0.02).of(expected_wait)
115+
end
116+
end
117+
118+
with ".periodic" do
119+
it "executes the block repeatedly with fixed delays" do
120+
iterations = 0
121+
thread = Thread.new do
122+
Async::Loop.periodic(interval: 0.05) do
123+
iterations += 1
124+
end
125+
end
126+
127+
sleep(0.2)
128+
expect(iterations).to be >= 2
129+
ensure
130+
thread.kill
131+
thread.join
132+
end
133+
134+
it "waits after each execution completes" do
135+
execution_times = []
136+
interval = 0.05
137+
138+
thread = Thread.new do
139+
Async::Loop.periodic(interval: interval) do
140+
execution_times << Time.now.to_f
141+
end
142+
end
143+
144+
sleep(0.2)
145+
ensure
146+
thread.kill
147+
thread.join
148+
149+
# Check that there's approximately 'interval' time between executions
150+
if execution_times.size >= 2
151+
gaps = execution_times.each_cons(2).map{|a, b| b - a}
152+
153+
# Each gap should be at least the interval
154+
gaps.each do |gap|
155+
expect(gap).to be >= interval
156+
end
157+
end
158+
end
159+
160+
it "continues after an error and logs it" do
161+
iterations = 0
162+
thread = Thread.new do
163+
Async::Loop.periodic(interval: 0.05) do
164+
iterations += 1
165+
raise "periodic error" if iterations == 2
166+
end
167+
end
168+
169+
sleep(0.2)
170+
expect(iterations).to be >= 3
171+
expect_console.to have_logged(
172+
severity: be == :error,
173+
subject: be_equal(Async::Loop),
174+
message: be =~ /Loop error:/
175+
)
176+
ensure
177+
thread.kill
178+
thread.join
179+
end
180+
181+
it "executes immediately on first iteration" do
182+
start_time = Time.now.to_f
183+
first_execution_time = nil
184+
185+
thread = Thread.new do
186+
Async::Loop.periodic(interval: 0.1) do
187+
first_execution_time ||= Time.now.to_f
188+
end
189+
end
190+
191+
# Wait for first execution
192+
sleep(0.05)
193+
ensure
194+
thread.kill
195+
thread.join
196+
197+
# The first execution should happen almost immediately
198+
elapsed = first_execution_time - start_time
199+
expect(elapsed).to be < 0.01
200+
end
201+
202+
it "accounts for execution time in the interval" do
203+
execution_times = []
204+
execution_duration = 0.03
205+
interval = 0.05
206+
207+
thread = Thread.new do
208+
Async::Loop.periodic(interval: interval) do
209+
execution_times << Time.now.to_f
210+
sleep(execution_duration)
211+
end
212+
end
213+
214+
sleep(0.3)
215+
ensure
216+
thread.kill
217+
thread.join
218+
219+
# Time between starts should be approximately interval + execution_duration
220+
if execution_times.size >= 2
221+
gaps = execution_times.each_cons(2).map{|a, b| b - a}
222+
expected_gap = interval + execution_duration
223+
224+
gaps.each do |gap|
225+
expect(gap).to be_within(0.02).of(expected_gap)
226+
end
227+
end
228+
end
229+
end
230+
end

0 commit comments

Comments
 (0)