Skip to content

Commit 68617d3

Browse files
Simplify container supervision using Async
Assisted-By: devx/3236e566-7538-432e-a30a-2bdf37265ed4
1 parent cb5a852 commit 68617d3

7 files changed

Lines changed: 633 additions & 459 deletions

File tree

fixtures/async/container/a_container.rb

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,38 @@ module Container
7373
end
7474
end
7575

76+
with "Async compatibility" do
77+
it "can wait inside an Async task after spawning outside Async" do
78+
input, output = IO.pipe
79+
80+
container.spawn do
81+
output.write(".")
82+
end
83+
84+
Async do
85+
container.wait
86+
end
87+
88+
output.close
89+
expect(input.read).to be == "."
90+
end
91+
92+
it "can spawn and wait inside the same Async task" do
93+
input, output = IO.pipe
94+
95+
Async do
96+
container.spawn do
97+
output.write(".")
98+
end
99+
100+
container.wait
101+
end
102+
103+
output.close
104+
expect(input.read).to be == "."
105+
end
106+
end
107+
76108
it "should be blocking" do
77109
skip "Fiber.blocking? is not supported!" unless Fiber.respond_to?(:blocking?)
78110

lib/async/container/channel.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@ def initialize(timeout: 1.0)
2525

2626
# Close the input end of the pipe.
2727
def close_read
28-
@in.close
28+
@in.close unless @in.closed?
29+
rescue IOError, Errno::EBADF
2930
end
3031

3132
# Close the output end of the pipe.
3233
def close_write
33-
@out.close
34+
@out.close unless @out.closed?
35+
rescue IOError, Errno::EBADF
3436
end
3537

3638
# Close both ends of the pipe.

lib/async/container/forked.rb

Lines changed: 47 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -101,23 +101,25 @@ def exec(*arguments, ready: true, **options)
101101
def self.fork(**options)
102102
# $stderr.puts fork: caller
103103
self.new(**options) do |process|
104-
::Process.fork do
105-
# We use `Thread.current.raise(...)` so that exceptions are filtered through `Thread.handle_interrupt` correctly.
106-
Signal.trap(:INT){::Thread.current.raise(Interrupt)}
107-
Signal.trap(:TERM){::Thread.current.raise(Interrupt)} # Same as SIGINT.
108-
Signal.trap(:HUP){::Thread.current.raise(Restart)}
109-
110-
# This could be a configuration option:
111-
::Thread.handle_interrupt(SignalException => :immediate) do
112-
yield Instance.for(process)
113-
rescue Interrupt
114-
# Graceful exit.
115-
rescue Exception => error
116-
Console.error(self, error)
104+
::Thread.new do
105+
::Process.fork do
106+
# We use `Thread.current.raise(...)` so that exceptions are filtered through `Thread.handle_interrupt` correctly.
107+
Signal.trap(:INT){::Thread.current.raise(Interrupt)}
108+
Signal.trap(:TERM){::Thread.current.raise(Interrupt)} # Same as SIGINT.
109+
Signal.trap(:HUP){::Thread.current.raise(Restart)}
117110

118-
exit!(1)
111+
# This could be a configuration option:
112+
::Thread.handle_interrupt(SignalException => :immediate) do
113+
yield Instance.for(process)
114+
rescue Interrupt
115+
# Graceful exit.
116+
rescue Exception => error
117+
Console.error(self, error)
118+
119+
exit!(1)
120+
end
119121
end
120-
end
122+
end.value
121123
end
122124
end
123125

@@ -151,6 +153,21 @@ def initialize(name: nil, **options)
151153
self.close_write
152154
end
153155

156+
# A minimal status for children reaped outside this object.
157+
class Status
158+
def success?
159+
true
160+
end
161+
162+
def to_i
163+
0
164+
end
165+
166+
def to_s
167+
"\#<#{self.class} success>"
168+
end
169+
end
170+
154171
# Convert the child process to a hash, suitable for serialization.
155172
#
156173
# @returns [Hash] The request as a hash.
@@ -236,24 +253,28 @@ def restart!
236253
# @parameter timeout [Numeric | Nil] Maximum time to wait before forceful termination.
237254
# @returns [::Process::Status] The process exit status.
238255
def wait(timeout = 0.1)
239-
if @pid && @status.nil?
240-
Console.debug(self, "Waiting for process to exit...", child: {process_id: @pid}, timeout: timeout)
241-
242-
_, @status = ::Process.wait2(@pid, ::Process::WNOHANG)
243-
244-
if @status.nil?
245-
sleep(timeout) if timeout
256+
begin
257+
if @pid && @status.nil?
258+
Console.debug(self, "Waiting for process to exit...", child: {process_id: @pid}, timeout: timeout)
246259

247260
_, @status = ::Process.wait2(@pid, ::Process::WNOHANG)
248261

249262
if @status.nil?
250-
Console.warn(self, "Process is blocking, sending kill signal...", child: {process_id: @pid}, timeout: timeout)
251-
self.kill!
263+
sleep(timeout) if timeout
264+
265+
_, @status = ::Process.wait2(@pid, ::Process::WNOHANG)
252266

253-
# Wait for the process to exit:
254-
_, @status = ::Process.wait2(@pid)
267+
if @status.nil?
268+
Console.warn(self, "Process is blocking, sending kill signal...", child: {process_id: @pid}, timeout: timeout)
269+
self.kill!
270+
271+
# Wait for the process to exit:
272+
_, @status = ::Process.wait2(@pid)
273+
end
255274
end
256275
end
276+
rescue Errno::ECHILD
277+
@status = Status.new
257278
end
258279

259280
Console.debug(self, "Process exited.", child: {process_id: @pid, status: @status})

lib/async/container/generic.rb

Lines changed: 98 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
# Copyright, 2025, by Marc-André Cournoyer.
66

77
require "etc"
8+
require "io/wait"
89
require "async/clock"
910

1011
require_relative "group"
@@ -144,8 +145,6 @@ def wait_until_ready
144145
end
145146
end
146147

147-
self.sleep
148-
149148
if self.status?(:ready)
150149
Console.debug(self) do |buffer|
151150
buffer.puts "All ready:"
@@ -156,6 +155,8 @@ def wait_until_ready
156155

157156
return true
158157
end
158+
159+
self.sleep
159160
end
160161
end
161162

@@ -222,13 +223,22 @@ def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, startu
222223
end
223224

224225
@statistics.spawn!
226+
started = ::Thread::Queue.new
225227

226-
fiber do
228+
@group.supervise do
229+
first = true
230+
227231
until @stopping
228232
Console.debug(self, "Starting child...", child: {key: key, name: name, restart: restart, health_check_timeout: health_check_timeout}, statistics: @statistics)
229233

230234
child = self.start(name, &block)
231235
state = insert(key, child)
236+
@group.insert(child)
237+
238+
if first
239+
started << true
240+
first = false
241+
end
232242

233243
# Notify policy of spawn
234244
begin
@@ -247,36 +257,11 @@ def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, startu
247257
status = nil
248258

249259
begin
250-
status = @group.wait_for(child) do |message|
251-
case message
252-
when :health_check!
253-
if state[:ready]
254-
# If a health check timeout is specified, we will monitor the child process and terminate it if it does not update its state within the specified time.
255-
if health_check_timeout
256-
if health_check_timeout < age_clock.total
257-
health_check_failed(child, age_clock, health_check_timeout)
258-
end
259-
end
260-
else
261-
# If a startup timeout is specified, we will monitor the child process and terminate it if it does not become ready within the specified time.
262-
if startup_timeout
263-
if startup_timeout < age_clock.total
264-
startup_failed(child, age_clock, startup_timeout)
265-
end
266-
end
267-
end
268-
else
269-
state.update(message)
270-
271-
# Reset the age clock if the child has become ready:
272-
if state[:ready]
273-
age_clock&.reset!
274-
end
275-
end
276-
end
260+
status = wait_for(child, state, age_clock, health_check_timeout, startup_timeout)
277261
rescue => error
278262
Console.error(self, "Error during child process management!", exception: error, stopping: @stopping)
279263
ensure
264+
@group.delete(child)
280265
delete(key, child)
281266
end
282267

@@ -300,7 +285,17 @@ def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, startu
300285
break
301286
end
302287
end
303-
end.resume
288+
rescue => error
289+
started << error if first
290+
raise
291+
ensure
292+
started << false if first
293+
end
294+
295+
case result = started.pop
296+
when Exception
297+
raise result
298+
end
304299

305300
return true
306301
end
@@ -363,6 +358,78 @@ def key?(key)
363358

364359
protected
365360

361+
def wait_for(child, state, age_clock, health_check_timeout, startup_timeout)
362+
while true
363+
timeout = next_timeout(state, age_clock, health_check_timeout, startup_timeout)
364+
365+
if wait_readable(child, timeout)
366+
if message = child.receive
367+
state.update(message)
368+
369+
# Reset the age clock if the child has become ready:
370+
if state[:ready]
371+
age_clock&.reset!
372+
end
373+
374+
@group.health_check!
375+
else
376+
break
377+
end
378+
else
379+
break if check_timeout(child, state, age_clock, health_check_timeout, startup_timeout)
380+
end
381+
end
382+
383+
return child.wait
384+
end
385+
386+
def wait_readable(child, timeout)
387+
child.in.wait_readable(timeout)
388+
rescue IO::TimeoutError
389+
false
390+
end
391+
392+
def next_timeout(state, age_clock, health_check_timeout, startup_timeout)
393+
return nil unless age_clock
394+
395+
timeout = if state[:ready]
396+
health_check_timeout
397+
else
398+
startup_timeout
399+
end
400+
401+
if timeout
402+
remaining = timeout - age_clock.total
403+
return 0 if remaining.negative?
404+
405+
if interval = @group.health_check_interval
406+
return [remaining, interval].min
407+
else
408+
return remaining
409+
end
410+
elsif interval = @group.health_check_interval
411+
return interval
412+
end
413+
end
414+
415+
def check_timeout(child, state, age_clock, health_check_timeout, startup_timeout)
416+
return false unless age_clock
417+
418+
if state[:ready]
419+
if health_check_timeout && health_check_timeout < age_clock.total
420+
health_check_failed(child, age_clock, health_check_timeout)
421+
return true
422+
end
423+
else
424+
if startup_timeout && startup_timeout < age_clock.total
425+
startup_failed(child, age_clock, startup_timeout)
426+
return true
427+
end
428+
end
429+
430+
return false
431+
end
432+
366433
# Register the child (value) as running.
367434
def insert(key, child)
368435
if key
@@ -385,17 +452,6 @@ def delete(key, child)
385452
@state.delete(child)
386453
end
387454

388-
private
389-
390-
if Fiber.respond_to?(:blocking?)
391-
def fiber(&block)
392-
Fiber.new(blocking: true, &block)
393-
end
394-
else
395-
def fiber(&block)
396-
Fiber.new(&block)
397-
end
398-
end
399455
end
400456
end
401457
end

0 commit comments

Comments
 (0)