Skip to content

Commit ec27366

Browse files
authored
Introduce Kernel#Barrier as a top level scheduling operation. (#426)
1 parent 170376f commit ec27366

File tree

6 files changed

+261
-15
lines changed

6 files changed

+261
-15
lines changed

lib/async.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
require_relative "kernel/async"
1111
require_relative "kernel/sync"
12+
require_relative "kernel/barrier"
1213

1314
# Asynchronous programming framework.
1415
module Async

lib/async/idler.rb

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,13 @@ class Idler
1313
# @parameter maximum_load [Numeric] The maximum load before we start shedding work.
1414
# @parameter backoff [Numeric] The initial backoff time, used for delaying work.
1515
# @parameter parent [Interface(:async) | Nil] The parent task to use for async operations.
16-
def initialize(maximum_load = 0.8, backoff: 0.01, parent: nil)
16+
def initialize(maximum_load = 0.8, backoff: 0.001, parent: nil)
1717
@maximum_load = maximum_load
1818
@backoff = backoff
19+
@current = backoff
20+
1921
@parent = parent
22+
@mutex = Mutex.new
2023
end
2124

2225
# Wait until the system is idle, then execute the given block in a new task.
@@ -38,20 +41,29 @@ def async(*arguments, parent: (@parent or Task.current), **options, &block)
3841
#
3942
# If the scheduler is overloaded, this method will sleep for an exponentially increasing amount of time.
4043
def wait
41-
scheduler = Fiber.scheduler
42-
backoff = nil
43-
44-
while true
45-
load = scheduler.load
46-
47-
break if load < @maximum_load
44+
@mutex.synchronize do
45+
scheduler = Fiber.scheduler
4846

49-
if backoff
50-
sleep(backoff)
51-
backoff *= 2.0
52-
else
53-
scheduler.yield
54-
backoff = @backoff
47+
while true
48+
load = scheduler.load
49+
50+
if load <= @maximum_load
51+
# Even though load is okay, if @current is high, we were recently overloaded. Sleep proportionally to prevent burst after load drop:
52+
if @current > @backoff
53+
# Sleep a fraction of @current to rate limit:
54+
sleep(@current - @backoff)
55+
56+
# Decay @current gently towards @backoff:
57+
alpha = 0.99
58+
@current *= alpha + (1.0 - alpha) * (load / @maximum_load)
59+
end
60+
61+
break
62+
else
63+
# We're overloaded, so increase backoff:
64+
@current *= (load / @maximum_load)
65+
sleep(@current)
66+
end
5567
end
5668
end
5769
end

lib/kernel/barrier.rb

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2025, by Samuel Williams.
5+
6+
require_relative "sync"
7+
require_relative "../async/barrier"
8+
require_relative "../async/idler"
9+
10+
module Kernel
11+
# Create a barrier, yield it to the block, and then wait for all tasks to complete.
12+
#
13+
# If no scheduler is running, one will be created automatically for the duration of the block.
14+
#
15+
# By default, the barrier uses an `Async::Idler` to manage load, but this can be overridden by providing a different parent or `nil` to disable load management.
16+
#
17+
# @parameter parent [Task | Semaphore | Nil] The parent for holding any children tasks.
18+
# @parameter **options [Hash] Additional options passed to {Kernel::Sync}.
19+
# @public Since *Async v2.34*.
20+
def Barrier(parent: Async::Idler.new, **options)
21+
Sync(**options) do |task|
22+
barrier = ::Async::Barrier.new(parent: parent)
23+
24+
yield barrier
25+
26+
barrier.wait
27+
ensure
28+
barrier&.stop
29+
end
30+
end
31+
end

releases.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,28 @@
11
# Releases
22

3+
## Unreleased
4+
5+
### `Kernel::Barrier` Convenience Interface
6+
7+
Starting multiple concurrent tasks and waiting for them to finish is a common pattern. This change introduces a small ergonomic helper, `Barrier`, defined in `Kernel`, that encapsulates this behavior: it creates an `Async::Barrier`, yields it to a block, waits for completion (using `Sync` to run a reactor if needed), and ensures remaining tasks are stopped on exit.
8+
9+
``` ruby
10+
require 'async'
11+
12+
Barrier do |barrier|
13+
3.times do |i|
14+
barrier.async do |task|
15+
sleep(rand * 0.1) # Simulate work
16+
puts "Task #{i} completed"
17+
end
18+
end
19+
end
20+
21+
# All tasks are guaranteed to complete or be stopped when the block exits.
22+
```
23+
24+
If an exception is raised by a task, it will be propagated to the caller, and any remaining tasks will be stopped. The `parent:` parameter can be used to specify a parent task for the barrier, otherwise it will use the current task if available, or create a new reactor if not.
25+
326
## v2.33.0
427

528
- Introduce `Async::Promise.fulfill` for optional promise resolution.

test/async/idler.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
end
2828

2929
# This test must be longer than the idle calculation window (1s)...
30-
sleep 1.1
30+
sleep 2.0
3131

3232
# Verify that the load is within the desired range:
3333
expect(Fiber.scheduler.load).to be_within(0.2).of(0.5)

test/kernel/barrier.rb

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2025, by Samuel Williams.
5+
6+
require "kernel/barrier"
7+
8+
describe Kernel do
9+
with "#Barrier" do
10+
it "should create a barrier, yield it, wait, and stop automatically" do
11+
finished = 0
12+
results = []
13+
14+
Barrier do |barrier|
15+
3.times do |i|
16+
barrier.async do |task|
17+
sleep(0.01 * i) # Different completion times
18+
results << "task_#{i}"
19+
finished += 1
20+
end
21+
end
22+
end
23+
24+
expect(finished).to be == 3
25+
expect(results.size).to be == 3
26+
end
27+
28+
it "should handle exceptions and still clean up properly" do
29+
exception_raised = false
30+
31+
expect do
32+
Barrier do |barrier|
33+
barrier.async do |task|
34+
raise "Test exception"
35+
end
36+
37+
barrier.async do |task|
38+
sleep(0.1) # This should be stopped
39+
end
40+
end
41+
end.to raise_exception(RuntimeError, message: be =~ /Test exception/)
42+
43+
# The barrier should have been cleaned up despite the exception.
44+
end
45+
46+
it "should support parent parameter" do
47+
parent_task = nil
48+
child_task = nil
49+
50+
Sync do |task|
51+
parent_task = task
52+
53+
Barrier(parent: task) do |barrier|
54+
barrier.async do |async_task|
55+
child_task = async_task
56+
# While the child task is running, parent should be set:
57+
expect(child_task.parent).to be == parent_task
58+
sleep(0.01)
59+
end
60+
end
61+
end
62+
63+
expect(child_task).not.to be_nil
64+
end
65+
66+
it "should wait for all tasks to complete before returning" do
67+
completion_order = []
68+
69+
Barrier do |barrier|
70+
3.times do |i|
71+
barrier.async do |task|
72+
sleep(0.01 * (3 - i)) # Reverse order completion
73+
completion_order << i
74+
end
75+
end
76+
end
77+
78+
# All tasks should have completed
79+
expect(completion_order.size).to be == 3
80+
expect(completion_order.sort).to be == [0, 1, 2]
81+
end
82+
83+
it "should stop remaining tasks when block exits early" do
84+
tasks = []
85+
86+
Sync do |parent|
87+
begin
88+
Barrier do |barrier|
89+
3.times do |i|
90+
tasks << barrier.async do |task|
91+
sleep(1) # Long running task
92+
end
93+
end
94+
95+
# Simulate early exit due to some condition
96+
raise "Early exit"
97+
end
98+
rescue => e
99+
# Expected exception
100+
end
101+
102+
# Wait for tasks to finish/stopped deterministically
103+
tasks.each do |t|
104+
begin
105+
t.wait
106+
rescue => e
107+
# ignore errors from waiting on stopped tasks
108+
end
109+
end
110+
end
111+
112+
# All three tasks should have been stopped
113+
expect(tasks.map(&:stopped?).all?).to be == true
114+
end
115+
116+
it "should handle empty barriers gracefully" do
117+
result = nil
118+
119+
expect do
120+
result = Async::Barrier() do |barrier|
121+
# No tasks added
122+
end
123+
end.not.to raise_exception
124+
125+
# Should complete successfully
126+
end
127+
128+
it "should create a barrier, yield it, wait, and stop automatically" do
129+
finished = 0
130+
results = []
131+
132+
Barrier do |barrier|
133+
3.times do |i|
134+
barrier.async do |task|
135+
sleep(0.01 * i)
136+
results << "task_#{i}"
137+
finished += 1
138+
end
139+
end
140+
end
141+
142+
expect(finished).to be == 3
143+
expect(results.size).to be == 3
144+
end
145+
146+
it "should handle exceptions and still clean up properly" do
147+
expect do
148+
Barrier do |barrier|
149+
barrier.async do |task|
150+
raise "Kernel helper exception"
151+
end
152+
barrier.async do |task|
153+
sleep(0.1)
154+
end
155+
end
156+
end.to raise_exception(RuntimeError, message: be =~ /Kernel helper exception/)
157+
end
158+
159+
it "should support parent parameter" do
160+
parent_task = nil
161+
child_task = nil
162+
163+
Sync do |task|
164+
parent_task = task
165+
166+
Barrier(parent: task) do |barrier|
167+
barrier.async do |async_task|
168+
child_task = async_task
169+
# While the child task is running, parent should be set:
170+
expect(child_task.parent).to be == parent_task
171+
sleep(0.01)
172+
end
173+
end
174+
end
175+
176+
expect(child_task).not.to be_nil
177+
end
178+
end
179+
end

0 commit comments

Comments
 (0)