Skip to content

Commit 2a2b2a4

Browse files
Use controller-owned signal wakeups
Assisted-By: devx/3236e566-7538-432e-a30a-2bdf37265ed4
1 parent 1609b1a commit 2a2b2a4

1 file changed

Lines changed: 49 additions & 28 deletions

File tree

lib/async/container/controller.rb

Lines changed: 49 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
require_relative "best"
88

99
require "async"
10+
require "async/queue"
1011

1112
require_relative "statistics"
1213
require_relative "notify"
@@ -33,6 +34,7 @@ def initialize(notify: Notify.open!, container_class: Container, graceful_stop:
3334
@container = nil
3435
@signals = {}
3536
@signal_queue = ::Thread::Queue.new
37+
@event_queue = ::Thread::Queue.new
3638

3739
self.trap(SIGHUP) do
3840
self.restart
@@ -164,12 +166,7 @@ def restart
164166
if old_container
165167
Console.info(self, "Stopping old container...")
166168

167-
begin
168-
@stopping_container = old_container
169-
old_container&.stop(@graceful_stop)
170-
ensure
171-
@stopping_container = nil
172-
end
169+
stop_container(old_container, @graceful_stop)
173170
end
174171

175172
@notify&.ready!(size: @container.size, status: "Running with #{@container.size} children.")
@@ -221,7 +218,7 @@ def run
221218
process_signals(@container)
222219

223220
if container = @container
224-
container.sleep
221+
controller_sleep(container)
225222
end
226223

227224
process_signals(@container) if @container
@@ -236,20 +233,49 @@ def run
236233
self.stop(false)
237234
end
238235

236+
private def stop_container(container, graceful)
237+
parent = Async::Task.current
238+
task = parent.async(transient: true) do
239+
container.stop(graceful)
240+
end
241+
242+
while task.running?
243+
process_signals(@container)
244+
controller_sleep(container)
245+
process_signals(@container) if @container
246+
end
247+
248+
task.wait
249+
end
250+
239251
private def wait_until_ready(container)
240-
@waiting_container = container
252+
until container.status?(:ready)
253+
process_signals(container, graceful: container.status?(:ready))
254+
255+
break unless container.running?
256+
257+
controller_sleep(container)
258+
end
259+
end
260+
261+
private def controller_sleep(container = nil)
262+
parent = Async::Task.current
263+
result = Async::Queue.new
241264

242-
begin
243-
until container.status?(:ready)
244-
process_signals(container, graceful: container.status?(:ready))
245-
246-
break unless container.running?
247-
248-
container.sleep
265+
event_task = parent.async(transient: true) do
266+
result << @event_queue.pop
267+
end
268+
269+
container_task = if container
270+
parent.async(transient: true) do
271+
result << container.sleep
249272
end
250-
ensure
251-
@waiting_container = nil
252273
end
274+
275+
result.pop
276+
ensure
277+
event_task&.stop
278+
container_task&.stop
253279
end
254280

255281
private def process_signals(container = @container, graceful: @graceful_stop)
@@ -275,27 +301,22 @@ def run
275301
end
276302

277303
private def with_signal_handlers
278-
# I thought this was the default... but it doesn't always raise an exception unless you do this explicitly.
279-
wake_containers = proc do
280-
@container&.group&.health_check!
281-
@waiting_container&.group&.health_check!
282-
@stopping_container&.group&.health_check!
304+
queue_signal = proc do |signal|
305+
@signal_queue << signal
306+
@event_queue << true
283307
end
284308

285309
interrupt_action = Signal.trap(:INT) do
286-
@signal_queue << SIGINT
287-
wake_containers.call
310+
queue_signal.call(SIGINT)
288311
end
289312

290313
# SIGTERM behaves the same as SIGINT by default.
291314
terminate_action = Signal.trap(:TERM) do
292-
@signal_queue << SIGTERM
293-
wake_containers.call
315+
queue_signal.call(SIGTERM)
294316
end
295317

296318
hangup_action = Signal.trap(:HUP) do
297-
@signal_queue << SIGHUP
298-
wake_containers.call
319+
queue_signal.call(SIGHUP)
299320
end
300321

301322
yield

0 commit comments

Comments
 (0)