Skip to content

Commit ded924c

Browse files
Integrate controller events with child waits.
Assisted-By: devx/3236e566-7538-432e-a30a-2bdf37265ed4
1 parent 02ce566 commit ded924c

7 files changed

Lines changed: 162 additions & 74 deletions

File tree

lib/async/container.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
# Released under the MIT License.
44
# Copyright, 2017-2025, by Samuel Williams.
55

6+
require_relative "container/events"
67
require_relative "container/signals"
78
require_relative "container/controller"
89

lib/async/container/controller.rb

Lines changed: 47 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
require_relative "statistics"
1010
require_relative "notify"
1111
require_relative "policy"
12+
require_relative "events"
13+
require_relative "signals"
1214

1315
module Async
1416
module Container
@@ -41,10 +43,29 @@ def initialize(notify: Notify.open!, container_class: Container, graceful_stop:
4143
@graceful_stop = graceful_stop
4244

4345
@container = nil
44-
@signals = {}
46+
@events = Events.new
47+
@signals = Signals.new(@events)
4548

4649
self.trap(SIGHUP) do
4750
self.restart
51+
rescue SetupError => error
52+
Console.error(self, error)
53+
end
54+
55+
self.trap(SIGUSR1) do
56+
self.reload
57+
rescue SetupError => error
58+
Console.error(self, error)
59+
end
60+
61+
self.trap(SIGINT) do
62+
self.stop
63+
:stop
64+
end
65+
66+
self.trap(SIGTERM) do
67+
self.stop
68+
:stop
4869
end
4970
end
5071

@@ -80,7 +101,7 @@ def to_s
80101
# @parameters signal [Symbol] The signal to trap, e.g. `:INT`.
81102
# @parameters block [Proc] The signal handler to invoke.
82103
def trap(signal, &block)
83-
@signals[signal] = block
104+
@signals.trap(signal, &block)
84105
end
85106

86107
# Create a policy for managing child lifecycle events.
@@ -117,13 +138,19 @@ def setup(container)
117138
end
118139

119140
# Start the container unless it's already running.
141+
# @returns [Generic] The container.
120142
def start
121143
unless @container
122144
Console.info(self, "Controller starting...")
123-
self.restart
145+
146+
if self.restart(@events) == :event
147+
return
148+
end
124149
end
125150

126151
Console.info(self, "Controller started.")
152+
153+
return @container
127154
end
128155

129156
# Stop the container if it's running.
@@ -135,7 +162,7 @@ def stop(graceful = @graceful_stop)
135162

136163
# Restart the container. A new container is created, and if successful, any old container is terminated gracefully.
137164
# This is equivalent to a blue-green deployment.
138-
def restart
165+
def restart(events = nil)
139166
if @container
140167
@notify&.restarting!
141168

@@ -156,7 +183,11 @@ def restart
156183

157184
# Wait for all child processes to enter the ready state.
158185
Console.info(self, "Waiting for startup...")
159-
container.wait_until_ready
186+
187+
if container.wait_until_ready(events) == :event
188+
return :event
189+
end
190+
160191
Console.info(self, "Finished startup.")
161192

162193
if container.failed?
@@ -195,7 +226,7 @@ def reload
195226
begin
196227
self.setup(@container)
197228
rescue
198-
raise SetupError, container
229+
raise SetupError, @container
199230
end
200231

201232
# Wait for all child processes to enter the ready state.
@@ -212,66 +243,28 @@ def reload
212243
end
213244
end
214245

215-
# Enter the controller run loop, trapping `SIGINT` and `SIGTERM`.
246+
# Enter the controller run loop.
216247
def run
217248
@notify&.status!("Initializing controller...")
218249

219-
with_signal_handlers do
250+
@signals.trapped do
220251
self.start
221252

253+
while event = @events.pop(timeout: 0)
254+
return if event.call == :stop
255+
end
256+
222257
while @container&.running?
223-
begin
224-
@container.wait
225-
rescue SignalException => exception
226-
if handler = @signals[exception.signo]
227-
begin
228-
handler.call
229-
rescue SetupError => error
230-
Console.error(self, error)
231-
end
232-
else
233-
raise
234-
end
258+
@container.wait(@events)
259+
260+
while event = @events.pop(timeout: 0)
261+
return if event.call == :stop
235262
end
236263
end
237264
end
238-
rescue Interrupt
239-
self.stop
240-
rescue Terminate
241-
self.stop(false)
242265
ensure
243266
self.stop(false)
244267
end
245-
246-
private def with_signal_handlers
247-
# I thought this was the default... but it doesn't always raise an exception unless you do this explicitly.
248-
249-
interrupt_action = Signal.trap(:INT) do
250-
# We use `Thread.current.raise(...)` so that exceptions are filtered through `Thread.handle_interrupt` correctly.
251-
# $stderr.puts "Received INT signal, interrupting...", caller
252-
::Thread.current.raise(Interrupt)
253-
end
254-
255-
# SIGTERM behaves the same as SIGINT by default.
256-
terminate_action = Signal.trap(:TERM) do
257-
# $stderr.puts "Received TERM signal, interrupting...", caller
258-
::Thread.current.raise(Interrupt) # Same as SIGINT
259-
end
260-
261-
hangup_action = Signal.trap(:HUP) do
262-
# $stderr.puts "Received HUP signal, restarting...", caller
263-
::Thread.current.raise(Restart)
264-
end
265-
266-
::Thread.handle_interrupt(SignalException => :never) do
267-
yield
268-
end
269-
ensure
270-
# Restore the interrupt handler:
271-
Signal.trap(:INT, interrupt_action)
272-
Signal.trap(:TERM, terminate_action)
273-
Signal.trap(:HUP, hangup_action)
274-
end
275268
end
276269
end
277270
end

lib/async/container/events.rb

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2026, by Samuel Williams.
5+
6+
module Async
7+
module Container
8+
# Represents a queue of events that can wake `IO.select`.
9+
class Events
10+
# Initialize the event queue.
11+
def initialize
12+
@queue = ::Thread::Queue.new
13+
@input, @output = ::IO.pipe
14+
@io = @input
15+
end
16+
17+
# @attribute [IO] The readable end of the event pipe.
18+
attr :input
19+
20+
# @attribute [IO] The readable end used to wait for events.
21+
attr :io
22+
23+
# Enqueue an event and wake any waiter.
24+
# @parameter event [Object] The event to enqueue.
25+
def <<(event)
26+
@queue << event
27+
28+
begin
29+
@output.write_nonblock(".")
30+
rescue ::IO::WaitWritable
31+
# The pipe is already full, so any select waiter is already awake:
32+
end
33+
34+
return self
35+
end
36+
37+
# Remove and return the next queued event.
38+
# @parameter non_block [Boolean] Whether to raise if no event is ready.
39+
# @parameter timeout [Numeric | Nil] The maximum time to wait for an event.
40+
def pop(non_block = false, timeout: nil)
41+
if timeout
42+
event = @queue.pop(non_block, timeout: timeout)
43+
else
44+
event = @queue.pop(non_block)
45+
end
46+
47+
return unless event
48+
49+
@input.read_nonblock(1)
50+
51+
return event
52+
rescue ::IO::WaitReadable
53+
return event if event
54+
55+
raise
56+
end
57+
end
58+
end
59+
end

lib/async/container/generic.rb

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -106,13 +106,13 @@ def stopping?
106106
# Sleep until some state change occurs or the specified duration elapses.
107107
#
108108
# @parameter duration [Numeric] the maximum amount of time to sleep for.
109-
def sleep(duration = nil)
110-
@group.sleep(duration)
109+
def sleep(duration = nil, events = nil)
110+
@group.sleep(duration, events)
111111
end
112112

113113
# Wait until all spawned tasks are completed.
114-
def wait
115-
@group.wait
114+
def wait(events = nil)
115+
@group.wait(events)
116116
end
117117

118118
# Gracefully interrupt all child instances.
@@ -135,7 +135,7 @@ def status?(flag)
135135

136136
# Wait until all the children instances have indicated that they are ready.
137137
# @returns [Boolean] The children all became ready.
138-
def wait_until_ready
138+
def wait_until_ready(events = nil)
139139
while true
140140
Console.debug(self) do |buffer|
141141
buffer.puts "Waiting for ready:"
@@ -144,7 +144,9 @@ def wait_until_ready
144144
end
145145
end
146146

147-
self.sleep
147+
if self.sleep(nil, events) == :event
148+
return :event
149+
end
148150

149151
if self.status?(:ready)
150152
Console.debug(self) do |buffer|

lib/async/container/group.rb

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,16 @@ def empty?
5757
end
5858

5959
# Sleep for at most the specified duration until some state change occurs.
60-
def sleep(duration)
61-
self.wait_for_children(duration)
60+
def sleep(duration, events = nil)
61+
self.wait_for_children(duration, events)
6262
end
6363

6464
# Begin any outstanding queued processes and wait for them indefinitely.
65-
def wait
65+
def wait(events = nil)
6666
with_health_checks do |duration|
67-
self.wait_for_children(duration)
67+
if self.wait_for_children(duration, events) == :event
68+
return
69+
end
6870
end
6971
end
7072

@@ -211,30 +213,40 @@ def wait_for(channel)
211213

212214
protected
213215

214-
def wait_for_children(duration = nil)
216+
def wait_for_children(duration = nil, events = nil)
215217
# This log is a bit noisy and doesn't really provide a lot of useful information:
216218
Console.debug(self, "Waiting for children...", duration: duration, running: @running)
217219

218220
unless @running.empty?
219221
# Maybe consider using a proper event loop here:
220-
if ready = self.select(duration)
222+
if ready = self.select(duration, events)
223+
if events && ready.delete(events.io)
224+
result = :event
225+
end
226+
221227
ready.each do |io|
222228
if fiber = @running[io]
223229
# This method can be re-entered. While resuming a fiber, a policy hook may be invoked, which may invoke operations on the container. In that case, select may be called again on the same set of waiting fibers. On returning, those fibers may have already completed and removed themselves from @running, so we need to check for that.
224230
fiber.resume
225231
end
226232
end
233+
234+
return result
227235
end
228236
end
229237
end
230238

231239
# Wait for a child process to exit OR a signal to be received.
232-
def select(duration)
233-
::Thread.handle_interrupt(SignalException => :immediate) do
234-
readable, _, _ = ::IO.select(@running.keys, nil, nil, duration)
235-
236-
return readable
240+
def select(duration, events = nil)
241+
waiting = @running.keys
242+
243+
if events
244+
waiting << events.io
237245
end
246+
247+
readable, _, _ = ::IO.select(waiting, nil, nil, duration)
248+
249+
return readable
238250
end
239251
end
240252
end

lib/async/container/signals.rb

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
# Released under the MIT License.
44
# Copyright, 2026, by Samuel Williams.
55

6+
require_relative "events"
7+
68
module Async
79
module Container
810
# Represents a collection of process signal handlers which enqueue events.
@@ -27,13 +29,13 @@ def call
2729
end
2830

2931
# Initialize the signal handler collection.
30-
# @parameter events [Thread::Queue] The queue used to receive signal events.
31-
def initialize(events = ::Thread::Queue.new)
32+
# @parameter events [Events] The queue used to receive signal events.
33+
def initialize(events = Events.new)
3234
@events = events
3335
@handlers = {}
3436
end
3537

36-
# @attribute [Thread::Queue] The queue used to receive signal events.
38+
# @attribute [Events] The queue used to receive signal events.
3739
attr :events
3840

3941
# Register a signal handler.

0 commit comments

Comments
 (0)