Skip to content

Commit 53a6318

Browse files
committed
Add load manangement.
1 parent 6f50729 commit 53a6318

2 files changed

Lines changed: 30 additions & 17 deletions

File tree

lib/async/idler.rb

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,13 @@ class Idler
1313
# @parameter maximum_load [Numeric] The maximum load before we start shedding work.
1414
# @parameter backoff [Numeric] The initial backoff time, used for delaying work.
1515
# @parameter parent [Interface(:async) | Nil] The parent task to use for async operations.
16-
def initialize(maximum_load = 0.8, backoff: 0.01, parent: nil)
16+
def initialize(maximum_load = 0.8, backoff: 0.001, parent: nil)
1717
@maximum_load = maximum_load
1818
@backoff = backoff
19+
@current = backoff
20+
1921
@parent = parent
22+
@mutex = Mutex.new
2023
end
2124

2225
# Wait until the system is idle, then execute the given block in a new task.
@@ -38,20 +41,29 @@ def async(*arguments, parent: (@parent or Task.current), **options, &block)
3841
#
3942
# If the scheduler is overloaded, this method will sleep for an exponentially increasing amount of time.
4043
def wait
41-
scheduler = Fiber.scheduler
42-
backoff = nil
43-
44-
while true
45-
load = scheduler.load
46-
47-
break if load < @maximum_load
44+
@mutex.synchronize do
45+
scheduler = Fiber.scheduler
4846

49-
if backoff
50-
sleep(backoff)
51-
backoff *= 2.0
52-
else
53-
scheduler.yield
54-
backoff = @backoff
47+
while true
48+
load = scheduler.load
49+
50+
if load <= @maximum_load
51+
# Even though load is okay, if @current is high, we were recently overloaded. Sleep proportionally to prevent burst after load drop:
52+
if @current > @backoff
53+
# Sleep a fraction of @current to rate limit:
54+
sleep(@current - @backoff)
55+
56+
# Decay @current gently towards @backoff:
57+
alpha = 0.99
58+
@current *= alpha + (1.0 - alpha) * (load / @maximum_load)
59+
end
60+
61+
break
62+
else
63+
# We're overloaded, so increase backoff:
64+
@current *= (load / @maximum_load)
65+
sleep(@current)
66+
end
5567
end
5668
end
5769
end

lib/kernel/barrier.rb

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,20 @@
55

66
require_relative "sync"
77
require_relative "../async/barrier"
8+
require_relative "../async/idler"
89

910
module Kernel
1011
# Create a barrier, yield it to the block, and then wait for all tasks to complete.
1112
#
1213
# If no scheduler is running, one will be created automatically for the duration of the block.
1314
#
15+
# By default, the barrier uses an `Async::Idler` to manage load, but this can be overridden by providing a different parent or `nil` to disable load management.
16+
#
1417
# @parameter parent [Task | Semaphore | Nil] The parent for holding any children tasks.
1518
# @parameter **options [Hash] Additional options passed to {Kernel::Sync}.
1619
# @public Since *Async v2.34*.
17-
def Barrier(parent: nil, **options)
20+
def Barrier(parent: Async::Idler.new, **options)
1821
Sync(**options) do |task|
19-
parent ||= task
20-
2122
barrier = ::Async::Barrier.new(parent: parent)
2223

2324
yield barrier

0 commit comments

Comments
 (0)