Skip to content

Commit a8f7921

Browse files
Fix policy-driven container shutdown
Assisted-By: devx/3236e566-7538-432e-a30a-2bdf37265ed4
1 parent 69c9fbb commit a8f7921

3 files changed

Lines changed: 111 additions & 18 deletions

File tree

lib/async/container/generic.rb

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ def stop(timeout = true)
173173
@stopping = true
174174
@group.stop(timeout)
175175

176-
if @group.running?
176+
if @group.running?(except: current_task)
177177
Console.warn(self, "Group is still running after stopping it!")
178178
else
179179
Console.info(self, "Group has stopped.")
@@ -183,6 +183,12 @@ def stop(timeout = true)
183183
raise
184184
end
185185

186+
private def current_task
187+
Async::Task.current
188+
rescue RuntimeError
189+
nil
190+
end
191+
186192
protected def health_check_failed(child, age_clock, health_check_timeout)
187193
begin
188194
@policy.health_check_failed(

lib/async/container/group.rb

Lines changed: 73 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,9 @@ def initialize(health_check_interval: 1.0)
3838

3939
@mutex = Mutex.new
4040
@children = {}
41-
@supervisors = 0
42-
@events = ::Thread::Queue.new
41+
@supervisors = {}
42+
@pending_events = 0
43+
@waiters = []
4344
end
4445

4546
# @attribute [Numeric | Nil] The interval used to wake waiters for periodic health checks.
@@ -58,9 +59,10 @@ def size
5859
end
5960

6061
# Check whether any supervisor tasks are still running.
62+
# @parameter except [Async::Task | Nil] The supervisor task to ignore.
6163
# @returns [Boolean] Whether any supervisor tasks are running.
62-
def running?
63-
@mutex.synchronize{@supervisors.positive?}
64+
def running?(except: nil)
65+
running_supervisors?(except: except)
6466
end
6567

6668
# Check whether any supervisor tasks are still running.
@@ -85,13 +87,16 @@ def running
8587
# @yields {...} The supervisor block to execute.
8688
# @returns [Async::Task] The supervisor task.
8789
def supervise(&block)
88-
@mutex.synchronize{@supervisors += 1}
90+
parent = Async::Task.current
8991

90-
Async::Task.current.async(transient: true) do
92+
parent.async(transient: true) do
93+
task = Async::Task.current
94+
@mutex.synchronize{@supervisors[task] = true}
95+
9196
begin
9297
block.call
9398
ensure
94-
@mutex.synchronize{@supervisors -= 1}
99+
@mutex.synchronize{@supervisors.delete(task)}
95100
signal!
96101
end
97102
end
@@ -116,15 +121,29 @@ def delete(child)
116121
# @parameter duration [Numeric | Nil] The maximum duration to sleep.
117122
# @returns [Object | Nil] The queued signal value, or `nil` if the sleep timed out.
118123
def sleep(duration = nil)
124+
events = ::Thread::Queue.new
125+
126+
@mutex.synchronize do
127+
if @pending_events.positive?
128+
@pending_events -= 1
129+
return true
130+
end
131+
132+
@waiters << events
133+
end
134+
119135
::Thread.handle_interrupt(SignalException => :immediate) do
120-
@events.pop(timeout: duration)
136+
events.pop(timeout: duration)
121137
end
138+
ensure
139+
@mutex.synchronize{@waiters.delete(events)} if events
122140
end
123141

124-
# Wait until all supervisor tasks have stopped.
142+
# Wait until all other supervisor tasks have stopped.
143+
# @parameter except [Async::Task | Nil] The supervisor task to ignore while waiting.
125144
# @returns [Nil]
126-
def wait
127-
sleep while running?
145+
def wait(except: current_supervisor)
146+
sleep while running_supervisors?(except: except)
128147
end
129148

130149
# Wake any waiters so they can re-check child health or state.
@@ -159,21 +178,26 @@ def kill
159178
# @returns [Nil]
160179
def stop(graceful = GRACEFUL_TIMEOUT)
161180
Console.debug(self, "Stopping all children...", graceful: graceful)
181+
except = current_supervisor
162182

163183
if graceful
164184
interrupt
165185

166186
graceful = DEFAULT_GRACEFUL_TIMEOUT if graceful == true
167-
wait_for_exit(Clock.start, graceful)
187+
wait_for_children(Clock.start, graceful)
168188
end
169189
ensure
170-
if running?
190+
if size.positive?
171191
if graceful
172192
Console.warn(self, "Killing children after graceful shutdown failed...", size: size)
173193
end
174194

175195
kill
176-
wait
196+
sleep while size.positive?
197+
end
198+
199+
if running_supervisors?(except: except)
200+
wait(except: except)
177201
end
178202
end
179203

@@ -189,8 +213,8 @@ def each_child
189213
end
190214
end
191215

192-
def wait_for_exit(clock, timeout)
193-
while running?
216+
def wait_for_children(clock, timeout)
217+
while size.positive?
194218
duration = timeout - clock.total
195219

196220
break if duration.negative?
@@ -199,8 +223,40 @@ def wait_for_exit(clock, timeout)
199223
end
200224
end
201225

226+
def current_supervisor
227+
task = Async::Task.current
228+
229+
@mutex.synchronize do
230+
@supervisors.key?(task) ? task : nil
231+
end
232+
rescue RuntimeError
233+
nil
234+
end
235+
236+
def running_supervisors?(except: nil)
237+
@mutex.synchronize do
238+
if except
239+
@supervisors.any?{|task, _| task != except}
240+
else
241+
@supervisors.any?
242+
end
243+
end
244+
end
245+
202246
def signal!
203-
@events << true
247+
waiters = @mutex.synchronize do
248+
if @waiters.empty?
249+
@pending_events += 1
250+
end
251+
252+
@waiters.dup
253+
end
254+
255+
waiters.each do |events|
256+
events << true
257+
end
258+
259+
return true
204260
end
205261
end
206262
end

test/async/container/policy.rb

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,37 @@ def child_exit(container, child, status, name:, key:, **options)
237237
expect(container.statistics.failures).to be == 1
238238
end
239239

240+
it "can stop the container from child_exit" do
241+
stop_policy = Class.new(Async::Container::Policy) do
242+
def initialize
243+
@stop_count = 0
244+
end
245+
246+
attr :stop_count
247+
248+
def child_exit(container, child, status, name:, key:, **options)
249+
unless container.stopping?
250+
@stop_count += 1
251+
container.stop
252+
end
253+
end
254+
end.new
255+
256+
container = Async::Container.best_container_class.new(policy: stop_policy)
257+
258+
3.times do |i|
259+
container.spawn(name: "worker-#{i}") do |instance|
260+
instance.ready!
261+
exit(1)
262+
end
263+
end
264+
265+
container.wait
266+
267+
expect(stop_policy.stop_count).to be == 1
268+
expect(container).not.to be(:running?)
269+
end
270+
240271
it "invokes callbacks for multiple children" do
241272
container = Async::Container.best_container_class.new(policy: tracking_policy)
242273

0 commit comments

Comments
 (0)